[ 
https://issues.apache.org/jira/browse/SPARK-43084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711505#comment-17711505
 ] 

Peng Zhong commented on SPARK-43084:
------------------------------------

Verified that udf working in streaming spark connect:

 
{code:java}
>>> from pyspark.sql.functions import col, udf
>>> def negative(value):
...   return -value
...
>>> negativeUDF = udf(lambda z: negative(z))
>>>
>>> query = (
...  spark
...  .readStream
...  .format("rate")
...  .option("numPartitions", "1")
...  .load()
...  .select(col("timestamp"), negativeUDF(col("value")).alias("negValue"))
...  .writeStream
...  .format("memory")
...  .queryName("rate_table")
...  .trigger(processingTime="10 seconds")
...  .start()
... )>>>
>>> query.status
{'message': 'Waiting for next trigger', 'isDataAvailable': True, 
'isTriggerActive': False}
>>>
>>> spark.sql("select * from rate_table").show()
+--------------------+--------+
|           timestamp|negValue|
+--------------------+--------+
|2023-04-12 11:04:...|       0|
|2023-04-12 11:04:...|      -1|
|2023-04-12 11:04:...|      -2|
|2023-04-12 11:04:...|      -3|
|2023-04-12 11:04:...|      -4|
|2023-04-12 11:04:...|      -5|
|2023-04-12 11:04:...|      -6|
|2023-04-12 11:04:...|      -7|
|2023-04-12 11:04:...|      -8|
|2023-04-12 11:05:...|      -9|
|2023-04-12 11:05:...|     -10|
|2023-04-12 11:05:...|     -11|
|2023-04-12 11:05:...|     -12|
|2023-04-12 11:05:...|     -13|
|2023-04-12 11:05:...|     -14|
|2023-04-12 11:05:...|     -15|
|2023-04-12 11:05:...|     -16|
|2023-04-12 11:05:...|     -17|
+--------------------+--------+>>> {code}

> Add Python state API (applyInPandasWithState) and verify UDFs
> -------------------------------------------------------------
>
>                 Key: SPARK-43084
>                 URL: https://issues.apache.org/jira/browse/SPARK-43084
>             Project: Spark
>          Issue Type: Task
>          Components: Connect, Structured Streaming
>    Affects Versions: 3.5.0
>         Environment: * Add Python state API (applyInPandasWithState) to 
> streaming Spark-connect.
>  * verify the UDFs work (it may not need any code changes).
>            Reporter: Raghu Angadi
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to