[jira] [Created] (SPARK-48717) Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled and is running spark queries

2024-06-25 Thread Wei Liu (Jira)
Wei Liu created SPARK-48717:
---

 Summary: Python foreachBatch streaming query cannot be stopped 
gracefully after pin thread mode is enabled and is running spark queries 
 Key: SPARK-48717
 URL: https://issues.apache.org/jira/browse/SPARK-48717
 Project: Spark
  Issue Type: New Feature
  Components: PySpark, SS
Affects Versions: 4.0.0
Reporter: Wei Liu


Followup of https://issues.apache.org/jira/browse/SPARK-39218

 

It only considered the InterruptedException is thrown when time.sleep(10) is 
intercepted. But when a spark query is executing:
{code:java}
def func(batch_df, batch_id):
batch_df.sparkSession.range(1000).write.saveAsTable("oops")
print(batch_df.count()) {code}
the actual error would be:

 
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling 
o2141502.saveAsTable.  
: org.apache.spark.SparkException: Job aborted.  



...
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:262)
  
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:262)
  
*Caused by: java.lang.InterruptedException  
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1000)*
  
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1308)
   {code}
We should also add consideration to this scenario 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48676) Structured Logging Framework Scala Style Migration [Part 2]

2024-06-20 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-48676:
--

 Summary: Structured Logging Framework Scala Style Migration [Part 
2]
 Key: SPARK-48676
 URL: https://issues.apache.org/jira/browse/SPARK-48676
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Amanda Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48632) Remove unused LogKeys

2024-06-14 Thread Amanda Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amanda Liu resolved SPARK-48632.

Resolution: Not A Problem

> Remove unused LogKeys
> -
>
> Key: SPARK-48632
> URL: https://issues.apache.org/jira/browse/SPARK-48632
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Amanda Liu
>Priority: Major
>  Labels: pull-request-available
>
> Remove unused LogKey objects to clean up LogKey.scala



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48632) Remove unused LogKeys

2024-06-14 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-48632:
--

 Summary: Remove unused LogKeys
 Key: SPARK-48632
 URL: https://issues.apache.org/jira/browse/SPARK-48632
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Amanda Liu


Remove unused LogKey objects to clean up LogKey.scala



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48628) Add task peak on/off heap execution memory metrics

2024-06-14 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48628:


 Summary: Add task peak on/off heap execution memory metrics
 Key: SPARK-48628
 URL: https://issues.apache.org/jira/browse/SPARK-48628
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Ziqi Liu


Currently there is no task on/off heap execution memory metrics. There is a 
[peakExecutionMemory|https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114]
  metrics, however, the semantic is a bit confusing: it only cover the 
execution memory used by shuffle/join/aggregate/sort, which is accumulated in 
specific operators.

 

We can easily maintain the whole task-level peak memory in TaskMemoryManager, 
assuming *acquireExecutionMemory* is the only one narrow waist for acquiring 
execution memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48623) Structured Logging Framework Scala Style Migration

2024-06-13 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-48623:
--

 Summary: Structured Logging Framework Scala Style Migration
 Key: SPARK-48623
 URL: https://issues.apache.org/jira/browse/SPARK-48623
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Amanda Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48610) Remove ExplainUtils.processPlan synchronize

2024-06-12 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48610:


 Summary: Remove ExplainUtils.processPlan synchronize
 Key: SPARK-48610
 URL: https://issues.apache.org/jira/browse/SPARK-48610
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Ziqi Liu


[https://github.com/apache/spark/pull/45282] introduced synchronize to 
`ExplainUtils.processPlan`  to avoid race condition when multiple queries 
refers to same cached plan.

The granularity of lock is too large. We can try to fix the root cause of this 
concurrency issue by refactoring the usage of mutable OP_ID_TAG, which is not a 
good practice in terms of immutable nature of SparkPlan.  Instead, we can use 
an auxiliary id map, with object identity as the key.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48592) Add scala style check for logging message inline variables

2024-06-11 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-48592:
--

 Summary: Add scala style check for logging message inline variables
 Key: SPARK-48592
 URL: https://issues.apache.org/jira/browse/SPARK-48592
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Amanda Liu


Ban logging messages using logInfo, logWarning, logError containing variables 
without {{MDC}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48589) Add option snapshotStartBatchId and snapshotPartitionId to state data source

2024-06-11 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48589:
--

 Summary: Add option snapshotStartBatchId and snapshotPartitionId 
to state data source
 Key: SPARK-48589
 URL: https://issues.apache.org/jira/browse/SPARK-48589
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


Define two new options, _snapshotStartBatchId_ and _snapshotPartitionId_, for 
the existing state reader. Both of them should be provided at the same time.
 # When there is no snapshot file at that batch (note there is an off-by-one 
issue between version and batch Id), throw an exception.
 # Otherwise, the reader should continue to rebuild the state by reading delta 
files only, and ignore all snapshot files afterwards.
 # Note that if a batchId option is already specified. That batchId is the 
ending batchId, we should then end at that batchId.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48588) Fine-grained State Data Source

2024-06-11 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48588:
--

 Summary: Fine-grained State Data Source
 Key: SPARK-48588
 URL: https://issues.apache.org/jira/browse/SPARK-48588
 Project: Spark
  Issue Type: Epic
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


The current state reader API replays the state store rows from the latest 
snapshot and newer delta files if any. The issue with this mechanism is that 
sometimes, the snapshot files could be wrongly constructed, or user want to 
know the change of state across batches. We need to improve the State Reader so 
that it can handle a variety of fine-grained requirements. For example, 
reconstruct a state based on arbitrary snapshot; support CDC mode for state 
evolution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48567) Pyspark StreamingQuery lastProgress and friend should return actual StreamingQueryProgress

2024-06-07 Thread Wei Liu (Jira)
Wei Liu created SPARK-48567:
---

 Summary: Pyspark StreamingQuery lastProgress and friend should 
return actual StreamingQueryProgress
 Key: SPARK-48567
 URL: https://issues.apache.org/jira/browse/SPARK-48567
 Project: Spark
  Issue Type: New Feature
  Components: PySpark, SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48542) Give snapshotStartBatchId and snapshotPartitionId to the state data source

2024-06-05 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48542:
--

 Summary: Give snapshotStartBatchId and snapshotPartitionId to the 
state data source
 Key: SPARK-48542
 URL: https://issues.apache.org/jira/browse/SPARK-48542
 Project: Spark
  Issue Type: New Feature
  Components: SQL, Structured Streaming
Affects Versions: 4.0.0
 Environment: This should work for both HDFS state store and RocksDB 
state store.
Reporter: Yuchen Liu


Right now, to read a version of the state data, the state source will try to 
find the first snapshot file before the given version and construct it using 
the delta files. In some debugging scenarios, users need more granular control 
on how to reconstruct the given state, for example they want to start from a 
specific snapshot instead of the closest one. One use case is to find whether a 
snapshot has been corrupted after committing.

This task introduces two options {{snapshotStartBatchId}} and 
{{snapshotPartitionId}} to the state data source. By specifying them, users can 
control the starting batch id of the snapshot and partition id of the state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48482) dropDuplicates and dropDuplicatesWithinWatermark should accept varargs

2024-05-30 Thread Wei Liu (Jira)
Wei Liu created SPARK-48482:
---

 Summary: dropDuplicates and dropDuplicatesWithinWatermark should 
accept varargs
 Key: SPARK-48482
 URL: https://issues.apache.org/jira/browse/SPARK-48482
 Project: Spark
  Issue Type: New Feature
  Components: PySpark
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48480) StreamingQueryListener thread should not be interruptable

2024-05-30 Thread Wei Liu (Jira)
Wei Liu created SPARK-48480:
---

 Summary: StreamingQueryListener thread should not be interruptable
 Key: SPARK-48480
 URL: https://issues.apache.org/jira/browse/SPARK-48480
 Project: Spark
  Issue Type: New Feature
  Components: Connect, SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48468) Add LogicalQueryStage interface in catalyst

2024-05-29 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48468:


 Summary: Add LogicalQueryStage interface in catalyst
 Key: SPARK-48468
 URL: https://issues.apache.org/jira/browse/SPARK-48468
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Ziqi Liu


Add `LogicalQueryStage` interface in catalyst so that it's visible in logical 
rules



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48465) Avoid no-op empty relation propagation in AQE

2024-05-29 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-48465:


 Summary: Avoid no-op empty relation propagation in AQE
 Key: SPARK-48465
 URL: https://issues.apache.org/jira/browse/SPARK-48465
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 4.0.0
Reporter: Ziqi Liu


We should avoid no-op empty relation propagation in AQE: if we convert an empty 
QueryStageExec to empty relation, it will further wrapped into a new query 
stage and execute -> produce empty result -> empty relation propagation again. 
This issue is currently not exposed because AQE will try to reuse shuffle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48447) Check state store provider class before invoking the constructor

2024-05-28 Thread Yuchen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuchen Liu updated SPARK-48447:
---
Priority: Major  (was: Minor)

> Check state store provider class before invoking the constructor
> 
>
> Key: SPARK-48447
> URL: https://issues.apache.org/jira/browse/SPARK-48447
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Yuchen Liu
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We should restrict that only classes 
> [extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73]
>  {{StateStoreProvider}} can be constructed to prevent customer from 
> instantiating arbitrary class of objects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48447) Check state store provider class before invoking the constructor

2024-05-28 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48447:
--

 Summary: Check state store provider class before invoking the 
constructor
 Key: SPARK-48447
 URL: https://issues.apache.org/jira/browse/SPARK-48447
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


We should restrict that only classes 
[extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73]
 {{StateStoreProvider}} can be constructed to prevent customer from 
instantiating arbitrary class of objects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48446) Update SS Doc of dropDuplicatesWithinWatermark to use the right syntax

2024-05-28 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48446:
--

 Summary: Update SS Doc of dropDuplicatesWithinWatermark to use the 
right syntax
 Key: SPARK-48446
 URL: https://issues.apache.org/jira/browse/SPARK-48446
 Project: Spark
  Issue Type: Documentation
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


For dropDuplicates, the example on 
[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#:~:text=)%20%5C%0A%20%20.-,dropDuplicates,-(%22guid%22]
 is out of date compared with 
[https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.dropDuplicates.html].
 The argument should be a list.

The discrepancy is also true for dropDuplicatesWithinWatermark.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark

2024-05-24 Thread Yuchen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849356#comment-17849356
 ] 

Yuchen Liu commented on SPARK-48411:


I will work on this.

> Add E2E test for DropDuplicateWithinWatermark
> -
>
> Key: SPARK-48411
> URL: https://issues.apache.org/jira/browse/SPARK-48411
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, SS
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> Currently we do not have a e2e test for DropDuplicateWithinWatermark, we 
> should add one. We can simply use one of the test written in Scala here (with 
> the testStream API) and replicate it to python:
> [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103]
>  
> The change should happen in 
> [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29]
>  
> so we can test it in both connect and non-connect.
>  
> Test with:
> ```
> python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming
> python/run-tests --testnames 
> pyspark.sql.tests.connect.streaming.test_parity_streaming
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark

2024-05-24 Thread Wei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849348#comment-17849348
 ] 

Wei Liu commented on SPARK-48411:
-

Sorry I tagged the wrong Yuchen

> Add E2E test for DropDuplicateWithinWatermark
> -
>
> Key: SPARK-48411
> URL: https://issues.apache.org/jira/browse/SPARK-48411
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, SS
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> Currently we do not have a e2e test for DropDuplicateWithinWatermark, we 
> should add one. We can simply use one of the test written in Scala here (with 
> the testStream API) and replicate it to python:
> [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103]
>  
> The change should happen in 
> [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29]
>  
> so we can test it in both connect and non-connect.
>  
> Test with:
> ```
> python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming
> python/run-tests --testnames 
> pyspark.sql.tests.connect.streaming.test_parity_streaming
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark

2024-05-24 Thread Wei Liu (Jira)
Wei Liu created SPARK-48411:
---

 Summary: Add E2E test for DropDuplicateWithinWatermark
 Key: SPARK-48411
 URL: https://issues.apache.org/jira/browse/SPARK-48411
 Project: Spark
  Issue Type: New Feature
  Components: Connect, SS
Affects Versions: 4.0.0
Reporter: Wei Liu


Currently we do not have a e2e test for DropDuplicateWithinWatermark, we should 
add one. We can simply use one of the test written in Scala here (with the 
testStream API) and replicate it to python:

[https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103]

 

The change should happen in 
[https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29]

 

so we can test it in both connect and non-connect.

 

Test with:

```
python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming
python/run-tests --testnames 
pyspark.sql.tests.connect.streaming.test_parity_streaming
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark

2024-05-24 Thread Wei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849202#comment-17849202
 ] 

Wei Liu commented on SPARK-48411:
-

[~liuyuchen777] is going to work on this

> Add E2E test for DropDuplicateWithinWatermark
> -
>
> Key: SPARK-48411
> URL: https://issues.apache.org/jira/browse/SPARK-48411
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, SS
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> Currently we do not have a e2e test for DropDuplicateWithinWatermark, we 
> should add one. We can simply use one of the test written in Scala here (with 
> the testStream API) and replicate it to python:
> [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103]
>  
> The change should happen in 
> [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29]
>  
> so we can test it in both connect and non-connect.
>  
> Test with:
> ```
> python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming
> python/run-tests --testnames 
> pyspark.sql.tests.connect.streaming.test_parity_streaming
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47946) Nested field's nullable value could be invalid after extracted using GetStructField

2024-05-15 Thread Linhong Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Linhong Liu resolved SPARK-47946.
-
Resolution: Not A Problem

> Nested field's nullable value could be invalid after extracted using 
> GetStructField
> ---
>
> Key: SPARK-47946
> URL: https://issues.apache.org/jira/browse/SPARK-47946
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 3.4.2
>Reporter: Junyoung Cho
>Priority: Major
>
> I've got error when append to table using DataFrameWriterV2.
> The error was occured in TableOutputResolver.checkNullability. This error 
> occurs when the data type of the schema is the same, but the order of the 
> fields is different.
> I found that GetStructField.nullable returns unexpected result.
> {code:java}
> override def nullable: Boolean = child.nullable || 
> childSchema(ordinal).nullable {code}
> Even if nested field has not nullability attribute, it returns true when 
> parent struct has nullability attribute.
> ||Parent nullability||Child nullability||Result||
> |true|true|true|
> |{color:#ff}true{color}|{color:#ff}false{color}|{color:#ff}true{color}|
> |{color:#172b4d}false{color}|{color:#172b4d}true{color}|{color:#172b4d}true{color}|
> |false|false|false|
>  
> I think the logic should be changed to get just child's nullability, because 
> both of parent and child should be nullable to be considered nullable.
>  
> {code:java}
> override def nullable: Boolean = childSchema(ordinal).nullable  {code}
>  
>  
>  
> I want to check current logic is reasonable, or my suggestion can occur other 
> side effect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-47946) Nested field's nullable value could be invalid after extracted using GetStructField

2024-05-15 Thread Linhong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846773#comment-17846773
 ] 

Linhong Liu commented on SPARK-47946:
-

No, it's not an issue.

think about this

 
||key||value (nullable=true)||
|a|{"x": 1, "y": 2}|
|b|null|
|c|{"x": null, "y": 3}|

let's assume `value.y` cannot be null (e.g. nullable = false), and run `select 
value.y from tbl`, what's the result? and what's the nullability of this 
column? it should be

 

 
||y||
|2|
|null|
|2|

 

 

> Nested field's nullable value could be invalid after extracted using 
> GetStructField
> ---
>
> Key: SPARK-47946
> URL: https://issues.apache.org/jira/browse/SPARK-47946
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 3.4.2
>Reporter: Junyoung Cho
>Priority: Major
>
> I've got error when append to table using DataFrameWriterV2.
> The error was occured in TableOutputResolver.checkNullability. This error 
> occurs when the data type of the schema is the same, but the order of the 
> fields is different.
> I found that GetStructField.nullable returns unexpected result.
> {code:java}
> override def nullable: Boolean = child.nullable || 
> childSchema(ordinal).nullable {code}
> Even if nested field has not nullability attribute, it returns true when 
> parent struct has nullability attribute.
> ||Parent nullability||Child nullability||Result||
> |true|true|true|
> |{color:#ff}true{color}|{color:#ff}false{color}|{color:#ff}true{color}|
> |{color:#172b4d}false{color}|{color:#172b4d}true{color}|{color:#172b4d}true{color}|
> |false|false|false|
>  
> I think the logic should be changed to get just child's nullability, because 
> both of parent and child should be nullable to be considered nullable.
>  
> {code:java}
> override def nullable: Boolean = childSchema(ordinal).nullable  {code}
>  
>  
>  
> I want to check current logic is reasonable, or my suggestion can occur other 
> side effect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48241) CSV parsing failure with char/varchar type columns

2024-05-11 Thread Jiayi Liu (Jira)
Jiayi Liu created SPARK-48241:
-

 Summary: CSV parsing failure with char/varchar type columns
 Key: SPARK-48241
 URL: https://issues.apache.org/jira/browse/SPARK-48241
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.5.1
Reporter: Jiayi Liu
 Fix For: 4.0.0


CSV table containing char and varchar columns will result in the following 
error when selecting from the CSV table:
{code:java}
java.lang.IllegalArgumentException: requirement failed: requiredSchema 
(struct) should be the subset of dataSchema 
(struct).
    at scala.Predef$.require(Predef.scala:281)
    at 
org.apache.spark.sql.catalyst.csv.UnivocityParser.(UnivocityParser.scala:56)
    at 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:127)
    at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
    at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
    at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125){code}
The reason for the error is that the StringType columns in the dataSchema and 
requiredSchema of UnivocityParser are not consistent. It is due to the metadata 
contained in the StringType StructField of the dataSchema, which is missing in 
the requiredSchema. We need to retain the metadata when resolving schema.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48181) Unify StreamingPythonRunner and PythonPlannerRunner

2024-05-07 Thread Wei Liu (Jira)
Wei Liu created SPARK-48181:
---

 Summary: Unify StreamingPythonRunner and PythonPlannerRunner
 Key: SPARK-48181
 URL: https://issues.apache.org/jira/browse/SPARK-48181
 Project: Spark
  Issue Type: New Feature
  Components: Connect, SS
Affects Versions: 4.0.0
Reporter: Wei Liu


We should unify the two driver side python runner for PySpark. To do this we 
should move out of StreamingPythonRunner and enhance PythonPlannerRunner with 
streaming support (multiple read - write loop)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48147) Remove all client listeners when local spark session is deleted

2024-05-06 Thread Wei Liu (Jira)
Wei Liu created SPARK-48147:
---

 Summary: Remove all client listeners when local spark session is 
deleted
 Key: SPARK-48147
 URL: https://issues.apache.org/jira/browse/SPARK-48147
 Project: Spark
  Issue Type: New Feature
  Components: Connect, PySpark, SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48093) Add config to switch between client side listener and server side listener

2024-05-02 Thread Wei Liu (Jira)
Wei Liu created SPARK-48093:
---

 Summary: Add config to switch between client side listener and 
server side listener 
 Key: SPARK-48093
 URL: https://issues.apache.org/jira/browse/SPARK-48093
 Project: Spark
  Issue Type: New Feature
  Components: Connect, SS
Affects Versions: 3.5.1, 3.5.0, 3.5.2
Reporter: Wei Liu


We are moving the implementation of Streaming Query Listener from server to 
client. For clients already running client side listener, to prevent 
regression, we should add a config to let them decide what type of listener the 
user want to use.

 

This is only added to 3.5.x published versions. For 4.0 and upwards we only use 
client side listener.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48002) Add Observed metrics test in PySpark StreamingQueryListeners

2024-04-26 Thread Wei Liu (Jira)
Wei Liu created SPARK-48002:
---

 Summary: Add Observed metrics test in PySpark 
StreamingQueryListeners
 Key: SPARK-48002
 URL: https://issues.apache.org/jira/browse/SPARK-48002
 Project: Spark
  Issue Type: New Feature
  Components: SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47877) Speed up test_parity_listener

2024-04-16 Thread Wei Liu (Jira)
Wei Liu created SPARK-47877:
---

 Summary: Speed up test_parity_listener
 Key: SPARK-47877
 URL: https://issues.apache.org/jira/browse/SPARK-47877
 Project: Spark
  Issue Type: New Feature
  Components: Connect, SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47718) .sql() does not recognize watermark defined upstream

2024-04-09 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-47718:

Labels:   (was: pull-request-available)

> .sql() does not recognize watermark defined upstream
> 
>
> Key: SPARK-47718
> URL: https://issues.apache.org/jira/browse/SPARK-47718
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.1
>Reporter: Chloe He
>Priority: Major
>
> I have a data pipeline set up in such a way that it reads data from a Kafka 
> source, does some transformation on the data using pyspark, then writes the 
> output into a sink (Kafka, Redis, etc).
>  
> My entire pipeline in written in SQL, so I wish to use the .sql() method to 
> execute SQL on my streaming source directly.
>  
> However, I'm running into the issue where my watermark is not being 
> recognized by the downstream query via the .sql() method.
>  
> ```
> Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) 
> [Clang 16.0.6 ] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> >>> import pyspark
> >>> print(pyspark.__version__)
> 3.5.1
> >>> from pyspark.sql import SparkSession
> >>>
> >>> session = SparkSession.builder \
> ...     .config("spark.jars.packages", 
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
> ...     .getOrCreate()
> >>> from pyspark.sql.functions import col, from_json
> >>> from pyspark.sql.types import StructField, StructType, TimestampType, 
> >>> LongType, DoubleType, IntegerType
> >>> schema = StructType(
> ...     [
> ...         StructField('createTime', TimestampType(), True),
> ...         StructField('orderId', LongType(), True),
> ...         StructField('payAmount', DoubleType(), True),
> ...         StructField('payPlatform', IntegerType(), True),
> ...         StructField('provinceId', IntegerType(), True),
> ...     ])
> >>>
> >>> streaming_df = session.readStream\
> ...     .format("kafka")\
> ...     .option("kafka.bootstrap.servers", "localhost:9092")\
> ...     .option("subscribe", "payment_msg")\
> ...     .option("startingOffsets","earliest")\
> ...     .load()\
> ...     .select(from_json(col("value").cast("string"), 
> schema).alias("parsed_value"))\
> ...     .select("parsed_value.*")\
> ...     .withWatermark("createTime", "10 seconds")
> >>>
> >>> streaming_df.createOrReplaceTempView("streaming_df")
> >>> session.sql("""
> ... SELECT
> ...     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> ...     FROM streaming_df
> ...     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ...     ORDER BY window.start
> ... """)\
> ...   .writeStream\
> ...   .format("kafka") \
> ...   .option("checkpointLocation", "checkpoint") \
> ...   .option("kafka.bootstrap.servers", "localhost:9092") \
> ...   .option("topic", "sink") \
> ...   .start()
> ```
>  
> This throws exception
> ```
> pyspark.errors.exceptions.captured.AnalysisException: Append output mode not 
> supported when there are streaming aggregations on streaming 
> DataFrames/DataSets without watermark; line 6 pos 4;
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47722) Wait until RocksDB background work finish before closing

2024-04-03 Thread Wei Liu (Jira)
Wei Liu created SPARK-47722:
---

 Summary: Wait until RocksDB background work finish before closing 
 Key: SPARK-47722
 URL: https://issues.apache.org/jira/browse/SPARK-47722
 Project: Spark
  Issue Type: New Feature
  Components: SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danke Liu updated SPARK-47542:
--
Description: 
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I create a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

 

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}

{dateFormat.format(timestampValue)}

,'-MM-dd HH:mi:ss')"

 

After this modification, it worked well.

 

 

 

 

  was:
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

 

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{dateFormat.format(timestampValue)},'-MM-dd
 HH:mi:ss')"

 

After this 

[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danke Liu updated SPARK-47542:
--
Description: 
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

 

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{dateFormat.format(timestampValue)},'-MM-dd
 HH:mi:ss')"

 

After this modification, it worked well.

 

 

 

 

  was:
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

 

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{\{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd
 HH:mi:ss')"

 

After 

[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danke Liu updated SPARK-47542:
--
Description: 
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

 

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{\{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd
 HH:mi:ss')"

 

After this modification, it worked well.

 

 

 

 

  was:
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 

[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danke Liu updated SPARK-47542:
--
Description: 
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd
 HH:mi:ss')"{color}

 

After this modification, it worked well.

 

 

 

 

  was:
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 

[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danke Liu updated SPARK-47542:
--
Description: 
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format:

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

As a result, the condition "update_time >= \{ts '2024-03-12 06:18:17'} will 
never hit the index.

In my case, as a workaround, I changed the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}

{
{color:#9876aa}

dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss')"{color}

 

then it worked well.

 

 

 

 

  was:
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format: 

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index.

In my case, as a work around, I just change the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 

[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danke Liu updated SPARK-47542:
--
Description: 
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

The update_time column in Oracle is of type DATE, which is mapped to Timestamp 
in Spark (because the precision of DATE in Oracle is second). When I push a 
filter to Oracle, it triggers the following code in 
org.apache.spark.sql.jdbc.OracleDialect:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format: 

{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index.

In my case, as a work around, I just change the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}

{\\{color:#9876aa}

dateFormat.format(timestampValue)},'-MM-dd HH:mi:ss'){color:#6a8759}"{color}

 

then it worked well.

 

 

 

 

  was:
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

the update_time column in oracle is DATE type, this mapped to spark has became 
Timestamp(because precision of DATE in oracle is second), and when I pushed a 
filter to oracle, it will hit the codes bellow:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format: \\{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index.

In my case, as a work around, I just change the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 

[jira] [Updated] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danke Liu updated SPARK-47542:
--
Description: 
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that reads from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I apply a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

the update_time column in oracle is DATE type, this mapped to spark has became 
Timestamp(because precision of DATE in oracle is second), and when I pushed a 
filter to oracle, it will hit the codes bellow:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match

{     // The JDBC drivers support date literals in SQL statements written in 
the     // format: \\{d '-mm-dd'}

and timestamp literals in SQL statements written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index.

In my case, as a work around, I just change the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{\{color:#9876aa}dateFormat.format(timestampValue)},'-MM-dd
 HH:mi:ss'){color:#6a8759}"{color}

 

then it worked well.

 

 

 

 

  was:
When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that read from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I pushed a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

the update_time column in oracle is DATE type, this mapped to spark has became 
Timestamp(because precision of DATE in oracle is second), and when I pushed a 
filter to oracle, it will hit the codes bellow:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match {
    // The JDBC drivers support date literals in SQL statements written in the
    // format: \{d '-mm-dd'} and timestamp literals in SQL statements 
written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index.

In my case, as a work around, I just change the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 

[jira] [Created] (SPARK-47542) spark cannot hit oracle's index when column type is DATE

2024-03-25 Thread Danke Liu (Jira)
Danke Liu created SPARK-47542:
-

 Summary: spark cannot hit oracle's index when column type is DATE
 Key: SPARK-47542
 URL: https://issues.apache.org/jira/browse/SPARK-47542
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.4
Reporter: Danke Liu


When I use spark's jdbc to pull data from oracle, it will not hit the index if 
the pushed filter's type in oralce is DATE.

Here is my scenario:

first I created  a dataframe that read from oracle:

val df = spark.read.format("jdbc").                                             
                           
          option("url", url).                                                   
                       
          option("driver", driver).                                             
                       
          option("user", user).                                                 
                  
          option("password", passwd).                                           
                                                      
          option("dbtable", "select * from foobar.tbl1") 
          .load()

then I pushed a filter to the dataframe like this:

df.filter("""`update_time` >= to_date('2024-03-12 06:18:17', '-MM-dd 
HH:mm:ss')  """).count()

this will not hit the index on update_time column.

 

Reason:

the update_time column in oracle is DATE type, this mapped to spark has became 
Timestamp(because precision of DATE in oracle is second), and when I pushed a 
filter to oracle, it will hit the codes bellow:

// class is org.apache.spark.sql.jdbc.OracleDialect

override def compileValue(value: Any): Any = value match {
    // The JDBC drivers support date literals in SQL statements written in the
    // format: \{d '-mm-dd'} and timestamp literals in SQL statements 
written
    // in the format: \{ts '-mm-dd hh:mm:ss.f...'}. For details, see
    // 'Oracle Database JDBC Developer’s Guide and Reference, 11g Release 1 
(11.1)'
    // Appendix A Reference Information.
    case stringValue: String => s"'${escapeSql(stringValue)}'"
    case timestampValue: Timestamp => "\{ts '" + timestampValue + "'}"
    case dateValue: Date => "\{d '" + dateValue + "'}"
    case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ")
    case _ => value
  }

 

and this "update_time >= \{ts '2024-03-12 06:18:17'}" will never hit the index.

In my case, as a work around, I just change the code to this:

{color:#cc7832}case {color}timestampValue: Timestamp 
=>{color:#6a8759}s"{color}{color:#6a8759}to_date({color}{{color:#9876aa}dateFormat{color}.format(timestampValue)}{color:#6a8759},'-MM-dd
 HH:mi:ss'){color}{color:#6a8759}"{color}

 

then it worked well.

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47332) StreamingPythonRunner don't need redundant logic for starting python process

2024-03-08 Thread Wei Liu (Jira)
Wei Liu created SPARK-47332:
---

 Summary: StreamingPythonRunner don't need redundant logic for 
starting python process
 Key: SPARK-47332
 URL: https://issues.apache.org/jira/browse/SPARK-47332
 Project: Spark
  Issue Type: New Feature
  Components: Connect, SS, Structured Streaming
Affects Versions: 4.0.0
Reporter: Wei Liu


https://github.com/apache/spark/pull/45023#discussion_r1516609093



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47292) safeMapToJValue should consider when map is null

2024-03-05 Thread Wei Liu (Jira)
Wei Liu created SPARK-47292:
---

 Summary: safeMapToJValue should consider when map is null
 Key: SPARK-47292
 URL: https://issues.apache.org/jira/browse/SPARK-47292
 Project: Spark
  Issue Type: New Feature
  Components: Connect, SS
Affects Versions: 3.5.1, 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47277) PySpark util function assertDataFrameEqual should not support streaming DF

2024-03-04 Thread Wei Liu (Jira)
Wei Liu created SPARK-47277:
---

 Summary: PySpark util function assertDataFrameEqual should not 
support streaming DF
 Key: SPARK-47277
 URL: https://issues.apache.org/jira/browse/SPARK-47277
 Project: Spark
  Issue Type: New Feature
  Components: Connect, PySpark, SQL, Structured Streaming
Affects Versions: 3.5.1, 3.5.0, 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.filter("key > 10")
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Filter (isnotnull(key#4L) AND (key#4L > 10))
   +- TableCacheQueryStage 0
      +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), 
(key#4L > 10)]
            +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
memory, deserialized, 1 replicas)
                  +- AdaptiveSparkPlan isFinalPlan=false
                     +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                        +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=24]
                           +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                              +- Project [(id#2L % 100) AS key#4L]
                                 +- Range (0, 1000, step=1, splits=10)
+- == Initial Plan ==
   Filter (isnotnull(key#4L) AND (key#4L > 10))
   +- InMemoryTableScan [key#4L, count(key)#10L], [isnotnull(key#4L), (key#4L > 
10)]
         +- InMemoryRelation [key#4L, count(key)#10L], StorageLevel(disk, 
memory, deserialized, 1 replicas)
               +- AdaptiveSparkPlan isFinalPlan=false
                  +- HashAggregate(keys=[key#4L], functions=[count(key#4L)])
                     +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=24]
                        +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                           +- Project [(id#2L % 100) AS key#4L]
                              +- Range (0, 1000, step=1, splits=10){code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
             

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet
 * maybe we just a coarse granularity lock in explain?
 * make innerChildren a function: clone the initial plan, every time checked 
for whether the original AQE plan is finalized (making the final flag atomic 
first, of course), if no return the cloned initial plan, if it's finalized, 
clone the final plan and return that one. But still this won't be able to 
reflect the AQE plan in real time, in a concurrent situation, but at least we 
have initial version and final version.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet, maybe we just a coarse granularity lock in 
explain?

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- 

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-27 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan. 

*In short, the plan used to executed and the plan used to explain is not the 
same instance, thus causing the inconsistency.*

 

I don't have a clear idea how yet, maybe we just a coarse granularity lock in 
explain?

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                 

[jira] [Updated] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-26 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-47177:
-
Description: 
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}

  was:
AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, 
count(key2)=10), Row(key2=8, count(key2)=10)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 

[jira] [Created] (SPARK-47177) Cached SQL plan do not display final AQE plan in explain string

2024-02-26 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-47177:


 Summary: Cached SQL plan do not display final AQE plan in explain 
string
 Key: SPARK-47177
 URL: https://issues.apache.org/jira/browse/SPARK-47177
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.5.1, 3.5.0, 3.4.2, 4.0.0, 3.5.2
Reporter: Ziqi Liu


AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, 
count(key2)=10), Row(key2=8, count(key2)=10)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47174) Client Side Listener - Server side implementation

2024-02-26 Thread Wei Liu (Jira)
Wei Liu created SPARK-47174:
---

 Summary: Client Side Listener - Server side implementation
 Key: SPARK-47174
 URL: https://issues.apache.org/jira/browse/SPARK-47174
 Project: Spark
  Issue Type: Improvement
  Components: Connect, SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-47174) Client Side Listener - Server side implementation

2024-02-26 Thread Wei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820828#comment-17820828
 ] 

Wei Liu commented on SPARK-47174:
-

im working on this

> Client Side Listener - Server side implementation
> -
>
> Key: SPARK-47174
> URL: https://issues.apache.org/jira/browse/SPARK-47174
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect, SS
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47173) fix typo in new streaming query listener explanation

2024-02-26 Thread Wei Liu (Jira)
Wei Liu created SPARK-47173:
---

 Summary: fix typo in new streaming query listener explanation
 Key: SPARK-47173
 URL: https://issues.apache.org/jira/browse/SPARK-47173
 Project: Spark
  Issue Type: Improvement
  Components: SS, UI
Affects Versions: 4.0.0
Reporter: Wei Liu


miss spelled 
flatMapGroupsWithState with flatMapGroupWithState (missed a "s" after group)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan

2024-02-06 Thread Ziqi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ziqi Liu updated SPARK-46995:
-
Component/s: SQL

> Allow AQE coalesce final stage in SQL cached plan
> -
>
> Key: SPARK-46995
> URL: https://issues.apache.org/jira/browse/SPARK-46995
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, SQL
>Affects Versions: 4.0.0
>Reporter: Ziqi Liu
>Priority: Major
>
> [https://github.com/apache/spark/pull/43435] and 
> [https://github.com/apache/spark/pull/43760] are fixing a correctness issue 
> which will be triggered when AQE applied on cached query plan, specifically, 
> when AQE coalescing the final result stage of the cached plan.
>  
> The current semantic of 
> {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}
> ([source 
> code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]):
>  * when true, we enable AQE, but disable coalescing final stage 
> ({*}default{*})
>  * when false, we disable AQE
>  
> But let’s revisit the semantic of this config: actually for caller the only 
> thing that matters is whether we change the output partitioning of the cached 
> plan. And we should only try to apply AQE if possible.  Thus we want to 
> modify the semantic of 
> {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}
>  * when true, we enable AQE and allow coalescing final: this might lead to 
> perf regression, because it introduce extra shuffle
>  * when false, we enable AQE, but disable coalescing final stage. *(this is 
> actually the `true` semantic of old behavior)*
> Also, to keep the default behavior unchanged, we might want to flip the 
> default value of 
> {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to `false`
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46995) Allow AQE coalesce final stage in SQL cached plan

2024-02-06 Thread Ziqi Liu (Jira)
Ziqi Liu created SPARK-46995:


 Summary: Allow AQE coalesce final stage in SQL cached plan
 Key: SPARK-46995
 URL: https://issues.apache.org/jira/browse/SPARK-46995
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Ziqi Liu


[https://github.com/apache/spark/pull/43435] and 
[https://github.com/apache/spark/pull/43760] are fixing a correctness issue 
which will be triggered when AQE applied on cached query plan, specifically, 
when AQE coalescing the final result stage of the cached plan.

 

The current semantic of 
{{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}

([source 
code|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411]):
 * when true, we enable AQE, but disable coalescing final stage ({*}default{*})

 * when false, we disable AQE

 

But let’s revisit the semantic of this config: actually for caller the only 
thing that matters is whether we change the output partitioning of the cached 
plan. And we should only try to apply AQE if possible.  Thus we want to modify 
the semantic of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}}
 * when true, we enable AQE and allow coalescing final: this might lead to perf 
regression, because it introduce extra shuffle

 * when false, we enable AQE, but disable coalescing final stage. *(this is 
actually the `true` semantic of old behavior)*

Also, to keep the default behavior unchanged, we might want to flip the default 
value of {{spark.sql.optimizer.canChangeCachedPlanOutputPartitioning}} to 
`false`

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46910) Eliminate JDK Requirement in PySpark Installation

2024-01-29 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-46910:
--

 Summary: Eliminate JDK Requirement in PySpark Installation
 Key: SPARK-46910
 URL: https://issues.apache.org/jira/browse/SPARK-46910
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Amanda Liu


PySpark requires users to have the correct JDK version (JDK 8+ for Spark<4; JDK 
17+ for Spark>=4) installed locally.

We can make the Spark installation script install the JDK, so users don’t need 
to do this step manually.
h1. Details
 # When the entry point for a Spark class is invoked, the spark-class script 
checks if Java is installed in the user environment.

 # If Java is not installed, the user is prompted to select whether they want 
to install JDK 17.

 # If the user selects yes, JDK 17 is installed (using the [install-jdk 
library|https://pypi.org/project/install-jdk/]) and JAVA_HOME variable and 
RUNNER are set appropriately. The Spark build will now work!

 # If the user selects no, we provide them a brief description of how to 
install JDK manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46873) PySpark spark.streams should not recreate new StreamingQueryManager

2024-01-25 Thread Wei Liu (Jira)
Wei Liu created SPARK-46873:
---

 Summary: PySpark spark.streams should not recreate new 
StreamingQueryManager
 Key: SPARK-46873
 URL: https://issues.apache.org/jira/browse/SPARK-46873
 Project: Spark
  Issue Type: Task
  Components: Connect, PySpark, SS
Affects Versions: 4.0.0
Reporter: Wei Liu


In Scala, there is only one streaming query manager for one spark session:

```

scala> spark.streams

val *res0*: *org.apache.spark.sql.streaming.StreamingQueryManager* = 
org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba

 

scala> spark.streams

val *res1*: *org.apache.spark.sql.streaming.StreamingQueryManager* = 
org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba

 

scala> spark.streams

val *res2*: *org.apache.spark.sql.streaming.StreamingQueryManager* = 
org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba

 

scala> spark.streams

val *res3*: *org.apache.spark.sql.streaming.StreamingQueryManager* = 
org.apache.spark.sql.streaming.StreamingQueryManager@46bb8cba

```

 

In Python, this is currently false:

```

>>> spark.streams



>>> spark.streams



>>> spark.streams



>>> spark.streams



```

 

Python should align scala behavior. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-44460) Pass user auth credential to Python workers for foreachBatch and listener

2024-01-23 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu closed SPARK-44460.
---

> Pass user auth credential to Python workers for foreachBatch and listener
> -
>
> Key: SPARK-44460
> URL: https://issues.apache.org/jira/browse/SPARK-44460
> Project: Spark
>  Issue Type: Task
>  Components: Connect, Structured Streaming
>Affects Versions: 3.4.1
>Reporter: Raghu Angadi
>Priority: Major
>
> No user specific credentials are sent to Python worker that runs user 
> functions like foreachBatch() and streaming listener. 
> We might need to pass in these. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46627) Streaming UI hover-over shows incorrect value

2024-01-08 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-46627:

Attachment: Screenshot 2024-01-08 at 15.06.24.png

> Streaming UI hover-over shows incorrect value
> -
>
> Key: SPARK-46627
> URL: https://issues.apache.org/jira/browse/SPARK-46627
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming, UI, Web UI
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
> Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png, Screenshot 
> 2024-01-08 at 15.06.24.png
>
>
> Running a simple streaming query:
> val df = spark.readStream.format("rate").option("rowsPerSecond", 
> "5000").load()
> val q = df.writeStream.format("noop").start()
>  
> The hover-over value is incorrect in the streaming ui (shows 321.00 at 
> undefined)
>  
> !Screenshot 2024-01-08 at 1.55.57 PM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-46627) Streaming UI hover-over shows incorrect value

2024-01-08 Thread Wei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804513#comment-17804513
 ] 

Wei Liu commented on SPARK-46627:
-

Also batch percent doesn't add to 100% now:

!Screenshot 2024-01-08 at 15.06.24.png!

> Streaming UI hover-over shows incorrect value
> -
>
> Key: SPARK-46627
> URL: https://issues.apache.org/jira/browse/SPARK-46627
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming, UI, Web UI
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
> Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png, Screenshot 
> 2024-01-08 at 15.06.24.png
>
>
> Running a simple streaming query:
> val df = spark.readStream.format("rate").option("rowsPerSecond", 
> "5000").load()
> val q = df.writeStream.format("noop").start()
>  
> The hover-over value is incorrect in the streaming ui (shows 321.00 at 
> undefined)
>  
> !Screenshot 2024-01-08 at 1.55.57 PM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46627) Streaming UI hover-over shows incorrect value

2024-01-08 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-46627:

Attachment: Screenshot 2024-01-08 at 1.55.57 PM.png

> Streaming UI hover-over shows incorrect value
> -
>
> Key: SPARK-46627
> URL: https://issues.apache.org/jira/browse/SPARK-46627
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming, UI, Web UI
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
> Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png
>
>
> Running a simple streaming query:
> val df = spark.readStream.format("rate").option("rowsPerSecond", 
> "5000").load()
> val q = df.writeStream.format("noop").start()
>  
> The hover-over value is incorrect in the streaming ui:
>  
> !https://files.slack.com/files-tmb/T02727P8HV4-F06CJ83D3JT-b44210f391/image_720.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-46627) Streaming UI hover-over shows incorrect value

2024-01-08 Thread Wei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804481#comment-17804481
 ] 

Wei Liu commented on SPARK-46627:
-

Hi Kent : ) [~yao] 

I was wondering if you have context in this issue? Thank you so much!

> Streaming UI hover-over shows incorrect value
> -
>
> Key: SPARK-46627
> URL: https://issues.apache.org/jira/browse/SPARK-46627
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming, UI, Web UI
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
> Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png
>
>
> Running a simple streaming query:
> val df = spark.readStream.format("rate").option("rowsPerSecond", 
> "5000").load()
> val q = df.writeStream.format("noop").start()
>  
> The hover-over value is incorrect in the streaming ui (shows 321.00 at 
> undefined)
>  
> !Screenshot 2024-01-08 at 1.55.57 PM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46627) Streaming UI hover-over shows incorrect value

2024-01-08 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-46627:

Description: 
Running a simple streaming query:

val df = spark.readStream.format("rate").option("rowsPerSecond", 
"5000").load()

val q = df.writeStream.format("noop").start()

 

The hover-over value is incorrect in the streaming ui (shows 321.00 at 
undefined)

 
!Screenshot 2024-01-08 at 1.55.57 PM.png!

  was:
Running a simple streaming query:

val df = spark.readStream.format("rate").option("rowsPerSecond", 
"5000").load()

val q = df.writeStream.format("noop").start()

 

The hover-over value is incorrect in the streaming ui:

 
!https://files.slack.com/files-tmb/T02727P8HV4-F06CJ83D3JT-b44210f391/image_720.png!


> Streaming UI hover-over shows incorrect value
> -
>
> Key: SPARK-46627
> URL: https://issues.apache.org/jira/browse/SPARK-46627
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming, UI, Web UI
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
> Attachments: Screenshot 2024-01-08 at 1.55.57 PM.png
>
>
> Running a simple streaming query:
> val df = spark.readStream.format("rate").option("rowsPerSecond", 
> "5000").load()
> val q = df.writeStream.format("noop").start()
>  
> The hover-over value is incorrect in the streaming ui (shows 321.00 at 
> undefined)
>  
> !Screenshot 2024-01-08 at 1.55.57 PM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46627) Streaming UI hover-over shows incorrect value

2024-01-08 Thread Wei Liu (Jira)
Wei Liu created SPARK-46627:
---

 Summary: Streaming UI hover-over shows incorrect value
 Key: SPARK-46627
 URL: https://issues.apache.org/jira/browse/SPARK-46627
 Project: Spark
  Issue Type: Task
  Components: Structured Streaming, UI, Web UI
Affects Versions: 4.0.0
Reporter: Wei Liu


Running a simple streaming query:

val df = spark.readStream.format("rate").option("rowsPerSecond", 
"5000").load()

val q = df.writeStream.format("noop").start()

 

The hover-over value is incorrect in the streaming ui:

 
!https://files.slack.com/files-tmb/T02727P8HV4-F06CJ83D3JT-b44210f391/image_720.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46384) Streaming UI doesn't display graph correctly

2023-12-12 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-46384:

Summary: Streaming UI doesn't display graph correctly  (was: Streaming UI 
doesn't show graph)

> Streaming UI doesn't display graph correctly
> 
>
> Key: SPARK-46384
> URL: https://issues.apache.org/jira/browse/SPARK-46384
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming, Web UI
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> The Streaming UI is broken currently at spark master. Running a simple query:
> ```
> q = 
> spark.readStream.format("rate").load().writeStream.format("memory").queryName("test_wei").start()
> ```
> Would make the spark UI shows empty graph for "operation duration":
> !https://private-user-images.githubusercontent.com/10248890/289990561-fdb78c92-2d6f-41a9-ba23-3068d128caa8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjEtZmRiNzhjOTItMmQ2Zi00MWE5LWJhMjMtMzA2OGQxMjhjYWE4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4N2FkZDYyZGQwZGZmNWJhN2IzMTM3ZmI1MzNhOGExZGY2MThjZjMwZDU5MzZiOTI4ZGVkMjc3MjBhMTNhZjUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.KXhI0NnwIpfTRjVcsXuA82AnaURHgtkLOYVzifI-mp8!
> Here is the error:
> !https://private-user-images.githubusercontent.com/10248890/289990953-cd477d48-a45e-4ee9-b45e-06dc1dbeb9d9.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA5NTMtY2Q0NzdkNDgtYTQ1ZS00ZWU5LWI0NWUtMDZkYzFkYmViOWQ5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI2MzIyYjQ5OWQ3YWZlMGYzYjFmYTljZjIwZjBmZDBiNzQyZmE3OTI2ZjkxNzVhNWU0ZDAwYTA4NDRkMTRjOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.JAqEG_4NCEiRvHO6yv59hZdkH_5_tSUuaOkpEbH-I20!
>  
> I verified the same query runs fine on spark 3.5, as in the following graph.
> !https://private-user-images.githubusercontent.com/10248890/289990563-642aa6c3-7728-43c7-8a11-cbf79c4362c5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjMtNjQyYWE2YzMtNzcyOC00M2M3LThhMTEtY2JmNzljNDM2MmM1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWM0YmZiYTkyNGZkNzBkOGMzMmUyYTYzZTMyYzQ1ZTZkNDU3MDk2M2ZlOGNlZmQxNGYzNTFjZWRiNTQ2ZmQzZWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.MJ8-78Qv5KkoLNGBrLXS-8gcC7LZepFsOD4r7pcnzSI!
>  
> This should be a problem from the library updates, this could be a potential 
> source of error: [https://github.com/apache/spark/pull/42879]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-46384) Structured Streaming UI doesn't display graph correctly

2023-12-12 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-46384:

Summary: Structured Streaming UI doesn't display graph correctly  (was: 
Streaming UI doesn't display graph correctly)

> Structured Streaming UI doesn't display graph correctly
> ---
>
> Key: SPARK-46384
> URL: https://issues.apache.org/jira/browse/SPARK-46384
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming, Web UI
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> The Streaming UI is broken currently at spark master. Running a simple query:
> ```
> q = 
> spark.readStream.format("rate").load().writeStream.format("memory").queryName("test_wei").start()
> ```
> Would make the spark UI shows empty graph for "operation duration":
> !https://private-user-images.githubusercontent.com/10248890/289990561-fdb78c92-2d6f-41a9-ba23-3068d128caa8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjEtZmRiNzhjOTItMmQ2Zi00MWE5LWJhMjMtMzA2OGQxMjhjYWE4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4N2FkZDYyZGQwZGZmNWJhN2IzMTM3ZmI1MzNhOGExZGY2MThjZjMwZDU5MzZiOTI4ZGVkMjc3MjBhMTNhZjUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.KXhI0NnwIpfTRjVcsXuA82AnaURHgtkLOYVzifI-mp8!
> Here is the error:
> !https://private-user-images.githubusercontent.com/10248890/289990953-cd477d48-a45e-4ee9-b45e-06dc1dbeb9d9.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA5NTMtY2Q0NzdkNDgtYTQ1ZS00ZWU5LWI0NWUtMDZkYzFkYmViOWQ5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI2MzIyYjQ5OWQ3YWZlMGYzYjFmYTljZjIwZjBmZDBiNzQyZmE3OTI2ZjkxNzVhNWU0ZDAwYTA4NDRkMTRjOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.JAqEG_4NCEiRvHO6yv59hZdkH_5_tSUuaOkpEbH-I20!
>  
> I verified the same query runs fine on spark 3.5, as in the following graph.
> !https://private-user-images.githubusercontent.com/10248890/289990563-642aa6c3-7728-43c7-8a11-cbf79c4362c5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjMtNjQyYWE2YzMtNzcyOC00M2M3LThhMTEtY2JmNzljNDM2MmM1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWM0YmZiYTkyNGZkNzBkOGMzMmUyYTYzZTMyYzQ1ZTZkNDU3MDk2M2ZlOGNlZmQxNGYzNTFjZWRiNTQ2ZmQzZWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.MJ8-78Qv5KkoLNGBrLXS-8gcC7LZepFsOD4r7pcnzSI!
>  
> This should be a problem from the library updates, this could be a potential 
> source of error: [https://github.com/apache/spark/pull/42879]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46384) Streaming UI doesn't show graph

2023-12-12 Thread Wei Liu (Jira)
Wei Liu created SPARK-46384:
---

 Summary: Streaming UI doesn't show graph
 Key: SPARK-46384
 URL: https://issues.apache.org/jira/browse/SPARK-46384
 Project: Spark
  Issue Type: Task
  Components: Structured Streaming, Web UI
Affects Versions: 4.0.0
Reporter: Wei Liu


The Streaming UI is broken currently at spark master. Running a simple query:

```

q = 
spark.readStream.format("rate").load().writeStream.format("memory").queryName("test_wei").start()

```

Would make the spark UI shows empty graph for "operation duration":

!https://private-user-images.githubusercontent.com/10248890/289990561-fdb78c92-2d6f-41a9-ba23-3068d128caa8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjEtZmRiNzhjOTItMmQ2Zi00MWE5LWJhMjMtMzA2OGQxMjhjYWE4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4N2FkZDYyZGQwZGZmNWJhN2IzMTM3ZmI1MzNhOGExZGY2MThjZjMwZDU5MzZiOTI4ZGVkMjc3MjBhMTNhZjUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.KXhI0NnwIpfTRjVcsXuA82AnaURHgtkLOYVzifI-mp8!

Here is the error:

!https://private-user-images.githubusercontent.com/10248890/289990953-cd477d48-a45e-4ee9-b45e-06dc1dbeb9d9.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA5NTMtY2Q0NzdkNDgtYTQ1ZS00ZWU5LWI0NWUtMDZkYzFkYmViOWQ5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI2MzIyYjQ5OWQ3YWZlMGYzYjFmYTljZjIwZjBmZDBiNzQyZmE3OTI2ZjkxNzVhNWU0ZDAwYTA4NDRkMTRjOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.JAqEG_4NCEiRvHO6yv59hZdkH_5_tSUuaOkpEbH-I20!

 

I verified the same query runs fine on spark 3.5, as in the following graph.

!https://private-user-images.githubusercontent.com/10248890/289990563-642aa6c3-7728-43c7-8a11-cbf79c4362c5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTEiLCJleHAiOjE3MDI0MjI2NjksIm5iZiI6MTcwMjQyMjM2OSwicGF0aCI6Ii8xMDI0ODg5MC8yODk5OTA1NjMtNjQyYWE2YzMtNzcyOC00M2M3LThhMTEtY2JmNzljNDM2MmM1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFJV05KWUFYNENTVkVINTNBJTJGMjAyMzEyMTIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjMxMjEyVDIzMDYwOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWM0YmZiYTkyNGZkNzBkOGMzMmUyYTYzZTMyYzQ1ZTZkNDU3MDk2M2ZlOGNlZmQxNGYzNTFjZWRiNTQ2ZmQzZWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.MJ8-78Qv5KkoLNGBrLXS-8gcC7LZepFsOD4r7pcnzSI!

 

This should be a problem from the library updates, this could be a potential 
source of error: [https://github.com/apache/spark/pull/42879]

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46279) Support write partition values to data files

2023-12-05 Thread fred liu (Jira)
fred liu created SPARK-46279:


 Summary: Support write partition values to data files
 Key: SPARK-46279
 URL: https://issues.apache.org/jira/browse/SPARK-46279
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: fred liu


Support write partition values to data files would give the flexibility to 
allow parquet files to be read correctly without relying on engine to read 
partition values from the path, and enables cases where individual parquet 
files can be copied and shared



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46250) Deflake test_parity_listener

2023-12-04 Thread Wei Liu (Jira)
Wei Liu created SPARK-46250:
---

 Summary: Deflake test_parity_listener
 Key: SPARK-46250
 URL: https://issues.apache.org/jira/browse/SPARK-46250
 Project: Spark
  Issue Type: Task
  Components: Connect, SS
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45845) Streaming UI add number of evicted state rows

2023-11-08 Thread Wei Liu (Jira)
Wei Liu created SPARK-45845:
---

 Summary: Streaming UI add number of evicted state rows
 Key: SPARK-45845
 URL: https://issues.apache.org/jira/browse/SPARK-45845
 Project: Spark
  Issue Type: Task
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Wei Liu


The UI is missing this chart, and people always confuse "aggregated number of 
rows dropped by watermark" with this newly added metric



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45834) Fix Pearson correlation calculation more stable

2023-11-07 Thread Jiayi Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayi Liu updated SPARK-45834:
--
Description: 
Spark uses the formula {{ck / sqrt(xMk * yMk)}} to calculate the Pearson 
Correlation Coefficient. If {{xMk}} and {{yMk}} are very small, it can lead to 
double multiplication overflow, resulting in a denominator of 0. This leads to 
an Infinity result in the calculation.

For example, when calculating the correlation for the same columns a and b in a 
table, the result will be Infinity, but the correlation for identical columns 
should be 1.0 instead.
||a||b||
|1e-200|1e-200|
|1e-200|1e-200|
|1e-100|1e-100|

Modifying the formula to {{ck / sqrt(xMk) / sqrt(yMk)}} can indeed solve this 
issue and improve the stability of the calculation. The benefit of this 
modification is that it splits the square root of the denominator into two 
parts: {{sqrt(xMk)}} and {{{}sqrt(yMk){}}}. This helps avoid multiplication 
overflow or cases where the product of extremely small values becomes zero.
 
 

  was:
Spark uses the formula {{ck / sqrt(xMk * yMk)}} to calculate the Pearson 
Correlation Coefficient. If {{xMk}} and {{yMk}} are very small, it can lead to 
double multiplication overflow, resulting in a denominator of 0. This leads to 
a NaN result in the calculation.

For example, when calculating the correlation for the same columns a and b in a 
table, the result will be Infinity, but the correlation for identical columns 
should be 1.0 instead.
||a||b||
|1e-200|1e-200|
|1e-200|1e-200|
|1e-100|1e-100|

Modifying the formula to {{ck / sqrt(xMk) / sqrt(yMk)}} can indeed solve this 
issue and improve the stability of the calculation. The benefit of this 
modification is that it splits the square root of the denominator into two 
parts: {{sqrt(xMk)}} and {{{}sqrt(yMk){}}}. This helps avoid multiplication 
overflow or cases where the product of extremely small values becomes zero.
 
 


> Fix Pearson correlation calculation more stable
> ---
>
> Key: SPARK-45834
> URL: https://issues.apache.org/jira/browse/SPARK-45834
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Jiayi Liu
>Priority: Major
>
> Spark uses the formula {{ck / sqrt(xMk * yMk)}} to calculate the Pearson 
> Correlation Coefficient. If {{xMk}} and {{yMk}} are very small, it can lead 
> to double multiplication overflow, resulting in a denominator of 0. This 
> leads to an Infinity result in the calculation.
> For example, when calculating the correlation for the same columns a and b in 
> a table, the result will be Infinity, but the correlation for identical 
> columns should be 1.0 instead.
> ||a||b||
> |1e-200|1e-200|
> |1e-200|1e-200|
> |1e-100|1e-100|
> Modifying the formula to {{ck / sqrt(xMk) / sqrt(yMk)}} can indeed solve this 
> issue and improve the stability of the calculation. The benefit of this 
> modification is that it splits the square root of the denominator into two 
> parts: {{sqrt(xMk)}} and {{{}sqrt(yMk){}}}. This helps avoid multiplication 
> overflow or cases where the product of extremely small values becomes zero.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45729) Fix PySpark testing guide links

2023-10-30 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-45729:
--

 Summary: Fix PySpark testing guide links
 Key: SPARK-45729
 URL: https://issues.apache.org/jira/browse/SPARK-45729
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Amanda Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45637) Time window aggregation in separate streams followed by stream-stream join not returning results

2023-10-27 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-45637:

Description: 
According to documentation update (SPARK-42591) resulting from SPARK-42376, 
Spark 3.5.0 should support time-window aggregations in two separate streams 
followed by stream-stream window join:

[https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995]

However, I failed to reproduce this example and the query I built doesn't 
return any results:
{code:java}
from pyspark.sql.functions import rand
from pyspark.sql.functions import expr, window, window_time

spark.conf.set("spark.sql.shuffle.partitions", "1")

impressions = (
spark
.readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()
.selectExpr("value AS adId", "timestamp AS impressionTime")
)

impressionsWithWatermark = impressions \
.selectExpr("adId AS impressionAdId", "impressionTime") \
.withWatermark("impressionTime", "10 seconds")

clicks = (  
spark  
.readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()  
.where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 
impressions result in a click  
.selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so 
that a click with same id as impression is generated later (i.e. delayed data).
    .where("adId > 0")  
) 

clicksWithWatermark = clicks \
.selectExpr("adId AS clickAdId", "clickTime") \
.withWatermark("clickTime", "10 seconds")

clicksWindow = clicksWithWatermark.groupBy(  
window(clicksWithWatermark.clickTime, "1 minute")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
window(impressionsWithWatermark.impressionTime, "1 minute")
).count()

clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")

clicksAndImpressions.writeStream \
.format("memory") \
.queryName("clicksAndImpressions") \
.outputMode("append") \
.start() {code}
 

My intuition is that I'm getting no results because to output results of the 
first stateful operator (time window aggregation), a watermark needs to pass 
the end timestamp of the window. And once the watermark is after the end 
timestamp of the window, this window is ignored at the second stateful operator 
(stream-stream) join because it's behind the watermark. Indeed, a small hack 
done to event time column (adding one minute) between two stateful operators 
makes it possible to get results:
{code:java}
clicksWindow2 = clicksWithWatermark.groupBy( 
window(clicksWithWatermark.clickTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
MINUTE')).drop("window")

impressionsWindow2 = impressionsWithWatermark.groupBy(
window(impressionsWithWatermark.impressionTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 
MINUTE')).drop("window")

clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", 
"inner")

clicksAndImpressions2.writeStream \
.format("memory") \
.queryName("clicksAndImpressions2") \
.outputMode("append") \
.start()  {code}
 

  was:
According to documentation update (SPARK-42591) resulting from SPARK-42376, 
Spark 3.5.0 should support time-window aggregations in two separate streams 
followed by stream-stream window join:

https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995

However, I failed to reproduce this example and the query I built doesn't 
return any results:
{code:java}
from pyspark.sql.functions import rand
from pyspark.sql.functions import expr, window, window_time

spark.conf.set("spark.sql.shuffle.partitions", "1")

impressions = (
spark
.readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()
.selectExpr("value AS adId", "timestamp AS impressionTime")
)

impressionsWithWatermark = impressions \
.selectExpr("adId AS impressionAdId", "impressionTime") \
.withWatermark("impressionTime", "10 seconds")

clicks = (  
spark  
.readStream.format("rate").option("rowsPerSecond", 
"5").option("numPartitions", "1").load()  
.where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 
impressions result in a click  
.selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so 
that a click with same id as impression is generated later (i.e. delayed data). 
 .where("adId > 0")
) 

clicksWithWatermark = clicks \
.selectExpr("adId AS clickAdId", "clickTime") \
.withWatermark("clickTime", "10 seconds")

clicksWindow = clicksWithWatermark.groupBy(  
window(clicksWithWatermark.clickTime, "1 minute")
).count()

impressionsWindow 

[jira] [Created] (SPARK-45677) Observe API error logging

2023-10-26 Thread Wei Liu (Jira)
Wei Liu created SPARK-45677:
---

 Summary: Observe API error logging
 Key: SPARK-45677
 URL: https://issues.apache.org/jira/browse/SPARK-45677
 Project: Spark
  Issue Type: Task
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Wei Liu


We should tell user why it's not supported and what to do

[https://github.com/apache/spark/blob/536439244593d40bdab88e9d3657f2691d3d33f2/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala#L76]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45053) Improve python version mismatch logging

2023-09-01 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-45053:

Description: Currently the syntax of the python version mismatching is a 
little bit confusing, it uses (3,9) to represent python version 3.9. Just a 
minor update to make it more straightforward 

> Improve python version mismatch logging
> ---
>
> Key: SPARK-45053
> URL: https://issues.apache.org/jira/browse/SPARK-45053
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 3.4.1
>Reporter: Wei Liu
>Priority: Trivial
>
> Currently the syntax of the python version mismatching is a little bit 
> confusing, it uses (3,9) to represent python version 3.9. Just a minor update 
> to make it more straightforward 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45056) Add process termination tests for Python foreachBatch and StreamingQueryListener

2023-09-01 Thread Wei Liu (Jira)
Wei Liu created SPARK-45056:
---

 Summary: Add process termination tests for Python foreachBatch and 
StreamingQueryListener
 Key: SPARK-45056
 URL: https://issues.apache.org/jira/browse/SPARK-45056
 Project: Spark
  Issue Type: Task
  Components: Connect, Structured Streaming
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45053) Improve python version mismatch logging

2023-09-01 Thread Wei Liu (Jira)
Wei Liu created SPARK-45053:
---

 Summary: Improve python version mismatch logging
 Key: SPARK-45053
 URL: https://issues.apache.org/jira/browse/SPARK-45053
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44971) [BUG Fix] PySpark StreamingQuerProgress fromJson

2023-08-25 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-44971:

Issue Type: Bug  (was: New Feature)

> [BUG Fix] PySpark StreamingQuerProgress fromJson 
> -
>
> Key: SPARK-44971
> URL: https://issues.apache.org/jira/browse/SPARK-44971
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Wei Liu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44971) [BUG Fix] PySpark StreamingQuerProgress fromJson

2023-08-25 Thread Wei Liu (Jira)
Wei Liu created SPARK-44971:
---

 Summary: [BUG Fix] PySpark StreamingQuerProgress fromJson 
 Key: SPARK-44971
 URL: https://issues.apache.org/jira/browse/SPARK-44971
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 3.5.0, 3.5.1
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44930) Deterministic ApplyFunctionExpression should be foldable

2023-08-23 Thread Xianyang Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianyang Liu updated SPARK-44930:
-
Description: Currently, ApplyFunctionExpression is unfoldable because 
inherits the default value from Expression.  However, it should be foldable for 
a deterministic ApplyFunctionExpression. This could help optimize the usage for 
V2 UDF applying to constant expressions.  (was: Currently, 
ApplyFunctionExpression is unfoldable because inherits the default value from 
Expression.  However, it should be foldable for a deterministic 
ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying 
on constant expression.)

> Deterministic ApplyFunctionExpression should be foldable
> 
>
> Key: SPARK-44930
> URL: https://issues.apache.org/jira/browse/SPARK-44930
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Xianyang Liu
>Priority: Major
>
> Currently, ApplyFunctionExpression is unfoldable because inherits the default 
> value from Expression.  However, it should be foldable for a deterministic 
> ApplyFunctionExpression. This could help optimize the usage for V2 UDF 
> applying to constant expressions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44930) Deterministic ApplyFunctionExpression should be foldable

2023-08-23 Thread Xianyang Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianyang Liu updated SPARK-44930:
-
Description: Currently, ApplyFunctionExpression is unfoldable because 
inherits the default value from Expression.  However, it should be foldable for 
a deterministic ApplyFunctionExpression. This could help optimize the usage for 
V2 UDF applying on constant expression.  (was: Currently, 
ApplyFunctionExpression is unfoldable because inherits the default value from 
Expression.  However, it should be foldable for a deterministic 
ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying 
on constant literal.)

> Deterministic ApplyFunctionExpression should be foldable
> 
>
> Key: SPARK-44930
> URL: https://issues.apache.org/jira/browse/SPARK-44930
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Xianyang Liu
>Priority: Major
>
> Currently, ApplyFunctionExpression is unfoldable because inherits the default 
> value from Expression.  However, it should be foldable for a deterministic 
> ApplyFunctionExpression. This could help optimize the usage for V2 UDF 
> applying on constant expression.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44930) Deterministic ApplyFunctionExpression should be foldable

2023-08-23 Thread Xianyang Liu (Jira)
Xianyang Liu created SPARK-44930:


 Summary: Deterministic ApplyFunctionExpression should be foldable
 Key: SPARK-44930
 URL: https://issues.apache.org/jira/browse/SPARK-44930
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.4.1
Reporter: Xianyang Liu


Currently, ApplyFunctionExpression is unfoldable because inherits the default 
value from Expression.  However, it should be foldable for a deterministic 
ApplyFunctionExpression. This could help optimize the usage for V2 UDF applying 
on constant literal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44917) PySpark Streaming DataStreamWriter table API

2023-08-22 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu resolved SPARK-44917.
-
Resolution: Not A Problem

> PySpark Streaming DataStreamWriter table API
> 
>
> Key: SPARK-44917
> URL: https://issues.apache.org/jira/browse/SPARK-44917
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44917) PySpark Streaming DataStreamWriter table API

2023-08-22 Thread Wei Liu (Jira)
Wei Liu created SPARK-44917:
---

 Summary: PySpark Streaming DataStreamWriter table API
 Key: SPARK-44917
 URL: https://issues.apache.org/jira/browse/SPARK-44917
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Wei Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44913) DS V2 supports push down V2 UDF that has magic method

2023-08-22 Thread Xianyang Liu (Jira)
Xianyang Liu created SPARK-44913:


 Summary: DS V2 supports push down V2 UDF that has magic method
 Key: SPARK-44913
 URL: https://issues.apache.org/jira/browse/SPARK-44913
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.4.1
Reporter: Xianyang Liu


Right now we only support pushing down the V2 UDF that has not a magic method. 
Because the V2 UDF will be analyzed into the `ApplyFunctionExpression` which 
could be translated and pushed down. However, a V2 UDF that has the magic 
method will be analyzed into `StaticInvoke` or `Invoke` that can not be 
translated into V2 expression and then can not be pushed down to the data 
source. The magic method is suggested. So this PR adds the support of pushing 
down the V2 UDF that has a magic method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44460) Pass user auth credential to Python workers for foreachBatch and listener

2023-08-21 Thread Wei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757087#comment-17757087
 ] 

Wei Liu commented on SPARK-44460:
-

[~rangadi] This seems to be a Databricks internal issue. See the updates in 
SC-138245

> Pass user auth credential to Python workers for foreachBatch and listener
> -
>
> Key: SPARK-44460
> URL: https://issues.apache.org/jira/browse/SPARK-44460
> Project: Spark
>  Issue Type: Task
>  Components: Connect, Structured Streaming
>Affects Versions: 3.4.1
>Reporter: Raghu Angadi
>Priority: Major
>
> No user specific credentials are sent to Python worker that runs user 
> functions like foreachBatch() and streaming listener. 
> We might need to pass in these. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44839) Better error logging when user accesses spark session in foreachBatch and Listener

2023-08-16 Thread Wei Liu (Jira)
Wei Liu created SPARK-44839:
---

 Summary: Better error logging when user accesses spark session in 
foreachBatch and Listener
 Key: SPARK-44839
 URL: https://issues.apache.org/jira/browse/SPARK-44839
 Project: Spark
  Issue Type: New Feature
  Components: Connect, Structured Streaming
Affects Versions: 3.5.0, 4.0.0
Reporter: Wei Liu


TypeError: cannot pickle '_thread._local' object

 

when user access `spark`. we need a better error for this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754280#comment-17754280
 ] 

Wei Liu commented on SPARK-44808:
-

This seems to be against the design principle of spark connect – We pass 
everything to the server side. Client only keep ids to get status. That's the 
reason why the foreachBatch and Listener are ran on the server side. This can 
be closed

> refreshListener() API on StreamingQueryManager for spark connect
> 
>
> Key: SPARK-44808
> URL: https://issues.apache.org/jira/browse/SPARK-44808
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> I’m thinking of an improvement for connect python listener and foreachBatch. 
> Currently if you define a variable outside of the function, you can’t 
> actually see it on client side if it’s touched in the function because the 
> operation is on server side, e.g. 
>  
>  
> {code:java}
> x = 0
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(e):
>         x = 100
>         self.y = 200
> spark.streams.addListener(MyListener())
> q = spark.readStream.start()
>  
> # x is still 0, self.y is not defined
> {code}
>  
>  
> But for the self.y case, there could be an improvement. 
>  
> The server side is capable of pickle serialize the whole listener instance 
> again, and send back to the client.
>  
> So we could define a new interface on the streaming query manager, maybe 
> called refreshListener(listener), e.g. use it as 
> `spark.streams.refreshListener()`
>  
>  
> {code:java}
> def refreshListener(listener: StreamingQueryListener) -> 
> StreamingQueryListener
>  # send the listener id with this refresh request, server receives this 
> request and serializes the listener again, and send back to client, so the 
> returned new listener contains the updated value self.y
>  
> {code}
>  
> For `foreachBatch`, we could wrap the function to a new class
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu resolved SPARK-44808.
-
Resolution: Won't Do

> refreshListener() API on StreamingQueryManager for spark connect
> 
>
> Key: SPARK-44808
> URL: https://issues.apache.org/jira/browse/SPARK-44808
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> I’m thinking of an improvement for connect python listener and foreachBatch. 
> Currently if you define a variable outside of the function, you can’t 
> actually see it on client side if it’s touched in the function because the 
> operation is on server side, e.g. 
>  
>  
> {code:java}
> x = 0
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(e):
>         x = 100
>         self.y = 200
> spark.streams.addListener(MyListener())
> q = spark.readStream.start()
>  
> # x is still 0, self.y is not defined
> {code}
>  
>  
> But for the self.y case, there could be an improvement. 
>  
> The server side is capable of pickle serialize the whole listener instance 
> again, and send back to the client.
>  
> So we could define a new interface on the streaming query manager, maybe 
> called refreshListener(listener), e.g. use it as 
> `spark.streams.refreshListener()`
>  
>  
> {code:java}
> def refreshListener(listener: StreamingQueryListener) -> 
> StreamingQueryListener
>  # send the listener id with this refresh request, server receives this 
> request and serializes the listener again, and send back to client, so the 
> returned new listener contains the updated value self.y
>  
> {code}
>  
> For `foreachBatch`, we could wrap the function to a new class
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-44808:

Description: 
I’m thinking of an improvement for connect python listener and foreachBatch. 
Currently if you define a variable outside of the function, you can’t actually 
see it on client side if it’s touched in the function because the operation is 
on server side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So we could define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

For `foreachBatch`, we could wrap the function to a new class

 

  was:
I’m thinking of an improvement for connect python listener and foreachBatch. 
Currently if you define a variable outside of the function, you can’t actually 
see it on client side if it’s touched in the function because the operation is 
on server side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

For `foreachBatch`, we could wrap the function to a new class

 


> refreshListener() API on StreamingQueryManager for spark connect
> 
>
> Key: SPARK-44808
> URL: https://issues.apache.org/jira/browse/SPARK-44808
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> I’m thinking of an improvement for connect python listener and foreachBatch. 
> Currently if you define a variable outside of the function, you can’t 
> actually see it on client side if it’s touched in the function because the 
> operation is on server side, e.g. 
>  
>  
> {code:java}
> x = 0
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(e):
>         x = 100
>         self.y = 200
> spark.streams.addListener(MyListener())
> q = spark.readStream.start()
>  
> # x is still 0, self.y is not defined
> {code}
>  
>  
> But for the self.y case, there could be an improvement. 
>  
> The server side is capable of pickle serialize the whole listener instance 
> again, and send back to the client.
>  
> So we could define a new interface on the streaming query manager, maybe 
> called refreshListener(listener), e.g. use it as 
> `spark.streams.refreshListener()`
>  
>  
> {code:java}
> def refreshListener(listener: StreamingQueryListener) -> 
> StreamingQueryListener
>  # send the listener id with this refresh request, server receives this 
> request and serializes the listener again, and send back to client, so the 
> returned new listener contains the updated value self.y
>  
> {code}
>  
> For `foreachBatch`, we could wrap the function to a new class
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-44808:

Description: 
I’m thinking of an improvement for connect python listener and foreachBatch. 
Currently if you define a variable outside of the function, you can’t actually 
see it on client side if it’s touched in the function because the operation is 
on server side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

For `foreachBatch`, we could wrap the function to a new class

 

  was:
I’m thinking of an improvement for connect python listener and foreachBatch. 
Currently if you define a variable outside of the function, you can’t actually 
see it on client side if it’s touched in the function because the operation is 
on server side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

For `foreachBatch`, we might could wrap the function to a new class, like the 
listener case

 


> refreshListener() API on StreamingQueryManager for spark connect
> 
>
> Key: SPARK-44808
> URL: https://issues.apache.org/jira/browse/SPARK-44808
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> I’m thinking of an improvement for connect python listener and foreachBatch. 
> Currently if you define a variable outside of the function, you can’t 
> actually see it on client side if it’s touched in the function because the 
> operation is on server side, e.g. 
>  
>  
> {code:java}
> x = 0
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(e):
>         x = 100
>         self.y = 200
> spark.streams.addListener(MyListener())
> q = spark.readStream.start()
>  
> # x is still 0, self.y is not defined
> {code}
>  
>  
> But for the self.y case, there could be an improvement. 
>  
> The server side is capable of pickle serialize the whole listener instance 
> again, and send back to the client.
>  
> So if we define a new interface on the streaming query manager, maybe called 
> refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`
>  
>  
> {code:java}
> def refreshListener(listener: StreamingQueryListener) -> 
> StreamingQueryListener
>  # send the listener id with this refresh request, server receives this 
> request and serializes the listener again, and send back to client, so the 
> returned new listener contains the updated value self.y
>  
> {code}
>  
> For `foreachBatch`, we could wrap the function to a new class
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-44808:

Description: 
I’m thinking of an improvement for connect python listener and foreachBatch. 
Currently if you define a variable outside of the function, you can’t actually 
see it on client side if it’s touched in the function because the operation is 
on server side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

For `foreachBatch`, we might could wrap the function to a new class, like the 
listener case

 

  was:
I’m thinking of an improvement for connect python listener and foreachBatch. 
Currently if you define a variable outside of the function, you can’t actually 
see it on client side if it’s touched in the function because the operation is 
on server side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

 

 


> refreshListener() API on StreamingQueryManager for spark connect
> 
>
> Key: SPARK-44808
> URL: https://issues.apache.org/jira/browse/SPARK-44808
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> I’m thinking of an improvement for connect python listener and foreachBatch. 
> Currently if you define a variable outside of the function, you can’t 
> actually see it on client side if it’s touched in the function because the 
> operation is on server side, e.g. 
>  
>  
> {code:java}
> x = 0
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(e):
>         x = 100
>         self.y = 200
> spark.streams.addListener(MyListener())
> q = spark.readStream.start()
>  
> # x is still 0, self.y is not defined
> {code}
>  
>  
> But for the self.y case, there could be an improvement. 
>  
> The server side is capable of pickle serialize the whole listener instance 
> again, and send back to the client.
>  
> So if we define a new interface on the streaming query manager, maybe called 
> refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`
>  
>  
> {code:java}
> def refreshListener(listener: StreamingQueryListener) -> 
> StreamingQueryListener
>  # send the listener id with this refresh request, server receives this 
> request and serializes the listener again, and send back to client, so the 
> returned new listener contains the updated value self.y
>  
> {code}
>  
> For `foreachBatch`, we might could wrap the function to a new class, like the 
> listener case
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-44808:

Description: 
I’m thinking of an improvement for connect python listener and foreachBatch. 
Currently if you define a variable outside of the function, you can’t actually 
see it on client side if it’s touched in the function because the operation is 
on server side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

 

 

  was:
I’m thinking of an improvement for python listener and foreachBatch. Currently 
if you define a variable outside of the function, you can’t actually see it on 
client side if it’s touched in the function because the operation is on server 
side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

 

 


> refreshListener() API on StreamingQueryManager for spark connect
> 
>
> Key: SPARK-44808
> URL: https://issues.apache.org/jira/browse/SPARK-44808
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> I’m thinking of an improvement for connect python listener and foreachBatch. 
> Currently if you define a variable outside of the function, you can’t 
> actually see it on client side if it’s touched in the function because the 
> operation is on server side, e.g. 
>  
>  
> {code:java}
> x = 0
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(e):
>         x = 100
>         self.y = 200
> spark.streams.addListener(MyListener())
> q = spark.readStream.start()
>  
> # x is still 0, self.y is not defined
> {code}
>  
>  
> But for the self.y case, there could be an improvement. 
>  
> The server side is capable of pickle serialize the whole listener instance 
> again, and send back to the client.
>  
> So if we define a new interface on the streaming query manager, maybe called 
> refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`
>  
>  
> {code:java}
> def refreshListener(listener: StreamingQueryListener) -> 
> StreamingQueryListener
>  # send the listener id with this refresh request, server receives this 
> request and serializes the listener again, and send back to client, so the 
> returned new listener contains the updated value self.y
>  
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)
Wei Liu created SPARK-44808:
---

 Summary: refreshListener() API on StreamingQueryManager for spark 
connect
 Key: SPARK-44808
 URL: https://issues.apache.org/jira/browse/SPARK-44808
 Project: Spark
  Issue Type: New Feature
  Components: Connect, Structured Streaming
Affects Versions: 4.0.0
Reporter: Wei Liu


I’m thinking of an improvement for python listener and foreachBatch. Currently 
if you define a variable outside of the function, you can’t actually see it on 
client side if it’s touched in the function because the operation is on server 
side, e.g. 

```

x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200

spark.streams.addListener(MyListener())
q = spark.readStream.start()

# x is still 0, self.y is not defined

```

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`




```

def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener

 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y

```

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44808) refreshListener() API on StreamingQueryManager for spark connect

2023-08-14 Thread Wei Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Liu updated SPARK-44808:

Description: 
I’m thinking of an improvement for python listener and foreachBatch. Currently 
if you define a variable outside of the function, you can’t actually see it on 
client side if it’s touched in the function because the operation is on server 
side, e.g. 

 

 
{code:java}
x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200
spark.streams.addListener(MyListener())
q = spark.readStream.start()
 
# x is still 0, self.y is not defined
{code}
 

 

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`

 

 
{code:java}
def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener
 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y
 
{code}
 

 

 

  was:
I’m thinking of an improvement for python listener and foreachBatch. Currently 
if you define a variable outside of the function, you can’t actually see it on 
client side if it’s touched in the function because the operation is on server 
side, e.g. 

```

x = 0
class MyListener(StreamingQueryListener):
    def onQueryStarted(e):
        x = 100
        self.y = 200

spark.streams.addListener(MyListener())
q = spark.readStream.start()

# x is still 0, self.y is not defined

```

But for the self.y case, there could be an improvement. 

 

The server side is capable of pickle serialize the whole listener instance 
again, and send back to the client.

 

So if we define a new interface on the streaming query manager, maybe called 
refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`




```

def refreshListener(listener: StreamingQueryListener) -> StreamingQueryListener

 # send the listener id with this refresh request, server receives this request 
and serializes the listener again, and send back to client, so the returned new 
listener contains the updated value self.y

```

 

 


> refreshListener() API on StreamingQueryManager for spark connect
> 
>
> Key: SPARK-44808
> URL: https://issues.apache.org/jira/browse/SPARK-44808
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> I’m thinking of an improvement for python listener and foreachBatch. 
> Currently if you define a variable outside of the function, you can’t 
> actually see it on client side if it’s touched in the function because the 
> operation is on server side, e.g. 
>  
>  
> {code:java}
> x = 0
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(e):
>         x = 100
>         self.y = 200
> spark.streams.addListener(MyListener())
> q = spark.readStream.start()
>  
> # x is still 0, self.y is not defined
> {code}
>  
>  
> But for the self.y case, there could be an improvement. 
>  
> The server side is capable of pickle serialize the whole listener instance 
> again, and send back to the client.
>  
> So if we define a new interface on the streaming query manager, maybe called 
> refreshListener(listener), e.g. use it as `spark.streams.refreshListener()`
>  
>  
> {code:java}
> def refreshListener(listener: StreamingQueryListener) -> 
> StreamingQueryListener
>  # send the listener id with this refresh request, server receives this 
> request and serializes the listener again, and send back to client, so the 
> returned new listener contains the updated value self.y
>  
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44764) Streaming process improvement

2023-08-10 Thread Wei Liu (Jira)
Wei Liu created SPARK-44764:
---

 Summary: Streaming process improvement
 Key: SPARK-44764
 URL: https://issues.apache.org/jira/browse/SPARK-44764
 Project: Spark
  Issue Type: New Feature
  Components: Connect, Structured Streaming
Affects Versions: 4.0.0
Reporter: Wei Liu


# Deduplicate or remove StreamingPythonRunner if possible, it is very similar 
to existing PythonRunner
 # Use the DAEMON mode for starting a new process



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44712) Migrate ‎test_timedelta_ops assert_eq to use assertDataFrameEqual

2023-08-07 Thread Amanda Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amanda Liu updated SPARK-44712:
---
Description: Migrate assert_eq to assertDataFrameEqual in this file: 
[‎python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py|https://github.com/databricks/runtime/blob/f579860299b16f6614f70c7cf2509cd89816d363/python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py#L176]
  (was: Migrate assert_eq to assertDataFrameEqual in this file: 
[python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46])

> Migrate ‎test_timedelta_ops assert_eq to use assertDataFrameEqual
> -
>
> Key: SPARK-44712
> URL: https://issues.apache.org/jira/browse/SPARK-44712
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Amanda Liu
>Priority: Major
>
> Migrate assert_eq to assertDataFrameEqual in this file: 
> [‎python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py|https://github.com/databricks/runtime/blob/f579860299b16f6614f70c7cf2509cd89816d363/python/pyspark/pandas/tests/data_type_ops/test_timedelta_ops.py#L176]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44712) Migrate ‎test_timedelta_ops assert_eq to use assertDataFrameEqual

2023-08-07 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-44712:
--

 Summary: Migrate ‎test_timedelta_ops assert_eq to use 
assertDataFrameEqual
 Key: SPARK-44712
 URL: https://issues.apache.org/jira/browse/SPARK-44712
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Amanda Liu


Migrate assert_eq to assertDataFrameEqual in this file: 
[python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44711) Migrate test_series_conversion assert_eq to use assertDataFrameEqual

2023-08-07 Thread Amanda Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amanda Liu updated SPARK-44711:
---
Description: 
Migrate assert_eq to assertDataFrameEqual in this file: 
[‎python/pyspark/pandas/tests/test_series_conversion.py|https://github.com/databricks/runtime/blob/d162bd182de8bcea180d874027edb86ae8fc60e5/python/pyspark/pandas/tests/test_series_conversion.py#L63]

  was:Migrate assert_eq to assertDataFrameEqual in this file: 
[python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46]


> Migrate test_series_conversion assert_eq to use assertDataFrameEqual
> 
>
> Key: SPARK-44711
> URL: https://issues.apache.org/jira/browse/SPARK-44711
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Amanda Liu
>Priority: Major
>
> Migrate assert_eq to assertDataFrameEqual in this file: 
> [‎python/pyspark/pandas/tests/test_series_conversion.py|https://github.com/databricks/runtime/blob/d162bd182de8bcea180d874027edb86ae8fc60e5/python/pyspark/pandas/tests/test_series_conversion.py#L63]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44711) Migrate test_series_conversion assert_eq to use assertDataFrameEqual

2023-08-07 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-44711:
--

 Summary: Migrate test_series_conversion assert_eq to use 
assertDataFrameEqual
 Key: SPARK-44711
 URL: https://issues.apache.org/jira/browse/SPARK-44711
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Amanda Liu


Migrate assert_eq to assertDataFrameEqual in this file: 
[python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44708) Migrate test_reset_index assert_eq to use assertDataFrameEqual

2023-08-07 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-44708:
--

 Summary: Migrate test_reset_index assert_eq to use 
assertDataFrameEqual
 Key: SPARK-44708
 URL: https://issues.apache.org/jira/browse/SPARK-44708
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Amanda Liu


Migrate assert_eq to assertDataFrameEqual in this file: 
[python/pyspark/pandas/tests/indexes/test_reset_index.py|https://github.com/apache/spark/blob/42e5daddf3ba16ff7d08e82e51cd8924cc56e180/python/pyspark/pandas/tests/indexes/test_reset_index.py#L46]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >