[jira] [Updated] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-47740:
---
Description: 
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
response = connection.send_command(command)
   
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
return_value = get_return_value(
   ^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
return f(*a, **kw)
   ^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark 

[jira] [Updated] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-47740:
---
Description: 
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
response = connection.send_command(command)
   
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
return_value = get_return_value(
   ^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
return f(*a, **kw)
   ^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark 

[jira] [Updated] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-47740:
---
Description: 
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
response = connection.send_command(command)
   
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
return_value = get_return_value(
   ^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
return f(*a, **kw)
   ^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark 

[jira] [Commented] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834379#comment-17834379
 ] 

Maxim Martynov commented on SPARK-47740:


Here is a PR for adding helper function 
[jvm_shutdown(gateway)|https://github.com/py4j/py4j/pull/541] to Py4J.

> Stop JVM by calling SparkSession.stop from PySpark
> --
>
> Key: SPARK-47740
> URL: https://issues.apache.org/jira/browse/SPARK-47740
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 3.5.1
>Reporter: Maxim Martynov
>Priority: Major
>
> {code:python}
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.config("spark.driver.memory", 
> "2g").config("spark.another.property", "hi").getOrCreate()
> spark.conf.get("spark.driver.memory")
> # prints '2g'
> spark.conf.get("spark.another.property")
> # prints 'hi'
> # check for JVM process before and after stop
> spark.stop()
> # try to recreate Spark session with different driver memory, and no 
> "spark.another.property" at all
> spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
> spark.conf.get("spark.driver.memory")
> # prints '3g'
> spark.conf.get("spark.another.property")
> # prints 'hi'
> # check for JVM process
> {code}
> After first call of {{.getOrCreate()}} JVM process with following options 
> have been started:
> {code}
> maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
> /home/maxim/.sdkman/candidates/java/current/bin/java -cp 
> /home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
>  -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
> --add-opens=java.base/java.lang=ALL-UNNAMED 
> --add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
> --add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED 
> --add-opens=java.base/java.net=ALL-UNNAMED 
> --add-opens=java.base/java.nio=ALL-UNNAMED 
> --add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
> --add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
> --add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
> --add-opens=java.base/sun.security.action=ALL-UNNAMED 
> --add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
> --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
> -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
> --conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
> {code}
> But it have not been stopped by {{.stop()}} method call. New Spark session 
> which reports 3Gb of driver memory actually uses the same 2Gb, and config 
> options from old session which should be closed at this moment.
> Summarizing:
> * {{spark.stop()}} stops only SparkContext, but not JVM
> * {{spark.stop()}} does not clean up session config
> This behavior has been observed for a long time, at least since 2.2.0, and 
> still present in 3.5.1.
> This could be solved by stopping JVM after Spark session is stopped.
> But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
> will be closed, and it will be impossible to start new Spark session:
> {code:python}
> spark._jvm.System.exit(0)
> {code}
> {code}
> ERROR:root:Exception while sending command.
> Traceback (most recent call last):
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
>  line 516, in send_command
> raise Py4JNetworkError("Answer from Java side is empty")
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
>  line 1038, in send_command
> response = connection.send_command(command)
>
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
>  line 539, in send_command
> raise Py4JNetworkError(
> py4j.protocol.Py4JNetworkError: Error while sending or receiving
> Traceback (most recent call last):
>   File "", line 1, in 
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
>  line 1322, in __call__
> return_value = get_return_value(
>^
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
>  line 179, in deco
> 

[jira] [Updated] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-47740:
---
Description: 
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
response = connection.send_command(command)
   
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
return_value = get_return_value(
   ^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
return f(*a, **kw)
   ^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark 

[jira] [Updated] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-47740:
---
Description: 
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
response = connection.send_command(command)
   
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
return_value = get_return_value(
   ^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
return f(*a, **kw)
   ^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark 

[jira] [Updated] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-47740:
---
Description: 
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
response = connection.send_command(command)
   
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
return_value = get_return_value(
   ^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
return f(*a, **kw)
   ^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark 

[jira] [Created] (SPARK-47740) Stop JVM by calling SparkSession.stop from PySpark

2024-04-05 Thread Maxim Martynov (Jira)
Maxim Martynov created SPARK-47740:
--

 Summary: Stop JVM by calling SparkSession.stop from PySpark
 Key: SPARK-47740
 URL: https://issues.apache.org/jira/browse/SPARK-47740
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 3.5.1
Reporter: Maxim Martynov


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim  76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
response = connection.send_command(command)
   
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "", line 1, in 
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
return_value = get_return_value(
   ^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
return f(*a, **kw)
   ^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is a description of a safe way to stop JVM without breaking py4j (in 
Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import 

[jira] [Comment Edited] (SPARK-33628) Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the HiveClientImpl

2023-08-30 Thread Maxim Martynov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760281#comment-17760281
 ] 

Maxim Martynov edited comment on SPARK-33628 at 8/30/23 8:53 AM:
-

Fixed in SPARK-42480, issue can be closed


was (Author: JIRAUSER283764):
Fixed in SPARK-42480

> Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the 
> HiveClientImpl
> 
>
> Key: SPARK-33628
> URL: https://issues.apache.org/jira/browse/SPARK-33628
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
>Reporter: jinhai
>Priority: Major
> Attachments: image-2020-12-02-16-57-43-619.png, 
> image-2020-12-03-14-38-19-221.png
>
>
> When partitions are tracked by the catalog, that will compute all custom 
> partition locations, especially when dynamic partitions, and the field 
> staticPartitions is empty.
>  The poor performance of the method listPartitions results in a long period 
> of no response at the Driver.
> When read 12253 partitions, the method getPartitionsByNames takes 2 seconds, 
> and the getPartitions takes 457 seconds, nearly 8 minutes
> !image-2020-12-02-16-57-43-619.png|width=783,height=54!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] (SPARK-33628) Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the HiveClientImpl

2023-08-30 Thread Maxim Martynov (Jira)


[ https://issues.apache.org/jira/browse/SPARK-33628 ]


Maxim Martynov deleted comment on SPARK-33628:


was (Author: JIRAUSER283764):
Can anyone review this pull request?

> Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the 
> HiveClientImpl
> 
>
> Key: SPARK-33628
> URL: https://issues.apache.org/jira/browse/SPARK-33628
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
>Reporter: jinhai
>Priority: Major
> Attachments: image-2020-12-02-16-57-43-619.png, 
> image-2020-12-03-14-38-19-221.png
>
>
> When partitions are tracked by the catalog, that will compute all custom 
> partition locations, especially when dynamic partitions, and the field 
> staticPartitions is empty.
>  The poor performance of the method listPartitions results in a long period 
> of no response at the Driver.
> When read 12253 partitions, the method getPartitionsByNames takes 2 seconds, 
> and the getPartitions takes 457 seconds, nearly 8 minutes
> !image-2020-12-02-16-57-43-619.png|width=783,height=54!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33628) Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the HiveClientImpl

2023-08-30 Thread Maxim Martynov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760281#comment-17760281
 ] 

Maxim Martynov commented on SPARK-33628:


Fixed in SPARK-42480

> Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the 
> HiveClientImpl
> 
>
> Key: SPARK-33628
> URL: https://issues.apache.org/jira/browse/SPARK-33628
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
>Reporter: jinhai
>Priority: Major
> Attachments: image-2020-12-02-16-57-43-619.png, 
> image-2020-12-03-14-38-19-221.png
>
>
> When partitions are tracked by the catalog, that will compute all custom 
> partition locations, especially when dynamic partitions, and the field 
> staticPartitions is empty.
>  The poor performance of the method listPartitions results in a long period 
> of no response at the Driver.
> When read 12253 partitions, the method getPartitionsByNames takes 2 seconds, 
> and the getPartitions takes 457 seconds, nearly 8 minutes
> !image-2020-12-02-16-57-43-619.png|width=783,height=54!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'no'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/v3.4.1/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'no'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is 

[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CLIENT_USERS: onetl
  KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception - instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CLIENT_USERS: onetl
  KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic 

[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CLIENT_USERS: onetl
  KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])

[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'no'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", 

[jira] [Created] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)
Maxim Martynov created SPARK-44774:
--

 Summary: SaveMode.ErrorIfExists does not work with kafka-sql
 Key: SPARK-44774
 URL: https://issues.apache.org/jira/browse/SPARK-44774
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.1
Reporter: Maxim Martynov


I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception - instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CLIENT_USERS: onetl
  KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33628) Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the HiveClientImpl

2023-03-02 Thread Maxim Martynov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695654#comment-17695654
 ] 

Maxim Martynov commented on SPARK-33628:


Can anyone review this pull request?

> Use the Hive.getPartitionsByNames method instead of Hive.getPartitions in the 
> HiveClientImpl
> 
>
> Key: SPARK-33628
> URL: https://issues.apache.org/jira/browse/SPARK-33628
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
>Reporter: jinhai
>Priority: Major
> Attachments: image-2020-12-02-16-57-43-619.png, 
> image-2020-12-03-14-38-19-221.png
>
>
> When partitions are tracked by the catalog, that will compute all custom 
> partition locations, especially when dynamic partitions, and the field 
> staticPartitions is empty.
>  The poor performance of the method listPartitions results in a long period 
> of no response at the Driver.
> When read 12253 partitions, the method getPartitionsByNames takes 2 seconds, 
> and the getPartitions takes 457 seconds, nearly 8 minutes
> !image-2020-12-02-16-57-43-619.png|width=783,height=54!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org