[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-03 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Description: 
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
Instead of the first select, it can be other types of manipulations with the 
original df, for example {color:#00875a}.withColumn("field3", lit("f3")){color} 
or {color:#00875a}.drop("field2"){color}, which will also lead to reading 
unnecessary nested fields from _struct_.

But if you just remove the first select or change type of join, reading nested 
fields will be correct:
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.

  was:
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
Instead of the first select, it can be other types of manipulations with the 
original df, for example {{^~.withColumn("field3", lit("f3"))~^}} or 
.drop("field2"), which will also lead to reading unnecessary nested fields from 
_struct_.

But if you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.


> Spark SQL 

[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-03 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Description: 
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
Instead of the first select, it can be other types of manipulations with the 
original df, for example {{^~.withColumn("field3", lit("f3"))~^}} or 
.drop("field2"), which will also lead to reading unnecessary nested fields from 
_struct_.

But if you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.

  was:
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
If you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.


> Spark SQL reads unnecessary nested fields (filter after explode)
> 
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue 

[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-03 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Description: 
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
If you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.

  was:
In this example, reading unnecessary nested fields still happens.

Data preparation:

 
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 

v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}


> Spark SQL reads unnecessary nested fields (filter after explode)
> 
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Sergey Kotlov
>Priority: Major
>
> In this example, reading unnecessary nested fields still happens.
> Data preparation:
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, array: Seq[String])
> Seq(
>   Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
> ).toDF().write.mode("overwrite").saveAsTable("table")
> {code}
>  v2 and v3 columns aren't needed here, but still exist in the physical plan.
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>  
> == Physical Plan ==
> ... ReadSchema: 
> struct,array:array>
> {code}
> If you just remove _filter_ or move _explode_ to second _select_, everything 
> is fine:
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   //.filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> spark.table("table")
>   .select($"struct.v1", $"array")
>   .select($"v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> {code}
>  
> *Yet another example: left_anti join after double 

[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-02 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Summary: Spark SQL reads unnecessary nested fields (filter after explode)  
(was: Spark SQL reads unnecсessary nested fields (filter after explode))

> Spark SQL reads unnecessary nested fields (filter after explode)
> 
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Sergey Kotlov
>Priority: Major
>
> In this example, reading unnecessary nested fields still happens.
> Data preparation:
>  
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, array: Seq[String])
> Seq(
>   Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
> ).toDF().write.mode("overwrite").saveAsTable("table")
> {code}
>  
> v2 and v3 columns aren't needed here, but still exist in the physical plan.
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>  
> == Physical Plan ==
> ... ReadSchema: 
> struct,array:array>
> {code}
> If you just remove _filter_ or move _explode_ to second _select_, everything 
> is fine:
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   //.filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> spark.table("table")
>   .select($"struct.v1", $"array")
>   .select($"v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37201) Spark SQL reads unnecсessary nested fields (filter after explode)

2021-11-02 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Summary: Spark SQL reads unnecсessary nested fields (filter after explode)  
(was: Spark SQL reads unnecessary nested fields (filter after explode))

> Spark SQL reads unnecсessary nested fields (filter after explode)
> -
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Sergey Kotlov
>Priority: Major
>
> In this example, reading unnecessary nested fields still happens.
> Data preparation:
>  
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, array: Seq[String])
> Seq(
>   Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
> ).toDF().write.mode("overwrite").saveAsTable("table")
> {code}
>  
> v2 and v3 columns aren't needed here, but still exist in the physical plan.
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>  
> == Physical Plan ==
> ... ReadSchema: 
> struct,array:array>
> {code}
> If you just remove _filter_ or move _explode_ to second _select_, everything 
> is fine:
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   //.filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> spark.table("table")
>   .select($"struct.v1", $"array")
>   .select($"v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-02 Thread Sergey Kotlov (Jira)
Sergey Kotlov created SPARK-37201:
-

 Summary: Spark SQL reads unnecessary nested fields (filter after 
explode)
 Key: SPARK-37201
 URL: https://issues.apache.org/jira/browse/SPARK-37201
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Sergey Kotlov


In this example, reading unnecessary nested fields still happens.

Data preparation:

 
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 

v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method

2021-04-07 Thread Sergey Kotlov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17316697#comment-17316697
 ] 

Sergey Kotlov commented on SPARK-34674:
---

The fix that I currently use:  [https://github.com/apache/spark/pull/32081]

> Spark app on k8s doesn't terminate without call to sparkContext.stop() method
> -
>
> Key: SPARK-34674
> URL: https://issues.apache.org/jira/browse/SPARK-34674
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.1.1
>Reporter: Sergey Kotlov
>Priority: Major
>
> Hello!
>  I have run into a problem that if I don't call the method 
> sparkContext.stop() explicitly, then a Spark driver process doesn't terminate 
> even after its Main method has been completed. This behaviour is different 
> from spark on yarn, where the manual sparkContext stopping is not required.
>  It looks like, the problem is in using non-daemon threads, which prevent the 
> driver jvm process from terminating.
>  At least I see two non-daemon threads, if I don't call sparkContext.stop():
> {code:java}
> Thread[OkHttp kubernetes.default.svc,5,main]
> Thread[OkHttp kubernetes.default.svc Writer,5,main]
> {code}
> Could you tell please, if it is possible to solve this problem?
> Docker image from the official release of spark-3.1.1 hadoop3.2 is used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method

2021-03-28 Thread Sergey Kotlov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17310235#comment-17310235
 ] 

Sergey Kotlov commented on SPARK-34674:
---

Thanks for looking into this problem, [~dongjoon].

As a temporary solution suitable for my purposes, I just added a finally block 
that closes SparkContext to [this 
place|https://github.com/apache/spark/blob/066c055b526aff5d2c0ace176f8eebf0eeab6f13/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L950].

> Spark app on k8s doesn't terminate without call to sparkContext.stop() method
> -
>
> Key: SPARK-34674
> URL: https://issues.apache.org/jira/browse/SPARK-34674
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.1.1
>Reporter: Sergey Kotlov
>Priority: Major
>
> Hello!
>  I have run into a problem that if I don't call the method 
> sparkContext.stop() explicitly, then a Spark driver process doesn't terminate 
> even after its Main method has been completed. This behaviour is different 
> from spark on yarn, where the manual sparkContext stopping is not required.
>  It looks like, the problem is in using non-daemon threads, which prevent the 
> driver jvm process from terminating.
>  At least I see two non-daemon threads, if I don't call sparkContext.stop():
> {code:java}
> Thread[OkHttp kubernetes.default.svc,5,main]
> Thread[OkHttp kubernetes.default.svc Writer,5,main]
> {code}
> Could you tell please, if it is possible to solve this problem?
> Docker image from the official release of spark-3.1.1 hadoop3.2 is used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method

2021-03-23 Thread Sergey Kotlov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307398#comment-17307398
 ] 

Sergey Kotlov commented on SPARK-34674:
---

I saw it in the sources. But you are right, generally I should not rely on the 
functionality, that is not described in the official documentation.

> Spark app on k8s doesn't terminate without call to sparkContext.stop() method
> -
>
> Key: SPARK-34674
> URL: https://issues.apache.org/jira/browse/SPARK-34674
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.1.1
>Reporter: Sergey Kotlov
>Priority: Major
>
> Hello!
>  I have run into a problem that if I don't call the method 
> sparkContext.stop() explicitly, then a Spark driver process doesn't terminate 
> even after its Main method has been completed. This behaviour is different 
> from spark on yarn, where the manual sparkContext stopping is not required.
>  It looks like, the problem is in using non-daemon threads, which prevent the 
> driver jvm process from terminating.
>  At least I see two non-daemon threads, if I don't call sparkContext.stop():
> {code:java}
> Thread[OkHttp kubernetes.default.svc,5,main]
> Thread[OkHttp kubernetes.default.svc Writer,5,main]
> {code}
> Could you tell please, if it is possible to solve this problem?
> Docker image from the official release of spark-3.1.1 hadoop3.2 is used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method

2021-03-20 Thread Sergey Kotlov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305419#comment-17305419
 ] 

Sergey Kotlov commented on SPARK-34674:
---

Thanks for response, [~dongjoon]

I haven't tried to use the previous versions of Spark on Kubernetes. Right now 
my company is using Spark on AWS EMR (it uses YARN), and we have never needed 
to call spark.stop() there. As far as I know, Spark uses the ShutdownHook to 
stop SparkContext anyway before exiting the JVM. But in my example, if I 
understand correctly, these non-daemon threads prevent the jvm process from 
exiting, even after the Main method of application has been completed. And I 
just thought, it can be considered as a bug.
P.S. I have to migrate a large number of existing Spark batch jobs (which are 
owned by different people across company) from EMR to K8S, and right now it is 
desirable to keep the code of these jobs unchanged.

> Spark app on k8s doesn't terminate without call to sparkContext.stop() method
> -
>
> Key: SPARK-34674
> URL: https://issues.apache.org/jira/browse/SPARK-34674
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.1.1
>Reporter: Sergey Kotlov
>Priority: Major
>
> Hello!
>  I have run into a problem that if I don't call the method 
> sparkContext.stop() explicitly, then a Spark driver process doesn't terminate 
> even after its Main method has been completed. This behaviour is different 
> from spark on yarn, where the manual sparkContext stopping is not required.
>  It looks like, the problem is in using non-daemon threads, which prevent the 
> driver jvm process from terminating.
>  At least I see two non-daemon threads, if I don't call sparkContext.stop():
> {code:java}
> Thread[OkHttp kubernetes.default.svc,5,main]
> Thread[OkHttp kubernetes.default.svc Writer,5,main]
> {code}
> Could you tell please, if it is possible to solve this problem?
> Docker image from the official release of spark-3.1.1 hadoop3.2 is used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org