[jira] [Resolved] (SPARK-34763) col(), $"" and df("name") should handle quoted column names properly.

2021-03-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-34763.
-
Fix Version/s: 3.0.3
   3.1.2
   3.2.0
   Resolution: Fixed

> col(), $"" and df("name") should handle quoted column names properly.
> ---
>
> Key: SPARK-34763
> URL: https://issues.apache.org/jira/browse/SPARK-34763
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.7, 3.0.2, 3.2.0, 3.1.1
>Reporter: Kousuke Saruta
>Assignee: Kousuke Saruta
>Priority: Major
> Fix For: 3.2.0, 3.1.2, 3.0.3
>
>
> Quoted column names like `a``b.c` cannot be represented with col(), $"" 
> and df("") because they don't handle such column names properly.
> For example, if we have a following DataFrame.
> {code}
> val df1 = spark.sql("SELECT 'col1' AS `a``b.c`")
> {code}
> For the DataFrame, this query is successfully executed.
> {code}
> scala> df1.selectExpr("`a``b.c`").show
> +-+
> |a`b.c|
> +-+
> | col1|
> +-+
> {code}
> But the following query will fail because df1("`a``b.c`") throws an exception.
> {code}
> scala> df1.select(df1("`a``b.c`")).show
> org.apache.spark.sql.AnalysisException: syntax error in attribute name: 
> `a``b.c`;
>   at 
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:152)
>   at 
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:162)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:121)
>   at org.apache.spark.sql.Dataset.resolve(Dataset.scala:221)
>   at org.apache.spark.sql.Dataset.col(Dataset.scala:1274)
>   at org.apache.spark.sql.Dataset.apply(Dataset.scala:1241)
>   ... 49 elided
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33720) Support submit to k8s only with token

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-33720:
--
Affects Version/s: (was: 3.0.2)
   3.2.0

> Support submit to k8s only with token
> -
>
> Key: SPARK-33720
> URL: https://issues.apache.org/jira/browse/SPARK-33720
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.2.0
>Reporter: hong dongdong
>Assignee: hong dongdong
>Priority: Major
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-33720) Support submit to k8s only with token

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-33720:
-

Assignee: hong dongdong

> Support submit to k8s only with token
> -
>
> Key: SPARK-33720
> URL: https://issues.apache.org/jira/browse/SPARK-33720
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.0.2
>Reporter: hong dongdong
>Assignee: hong dongdong
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-33720) Support submit to k8s only with token

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-33720.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

Issue resolved by pull request 30684
[https://github.com/apache/spark/pull/30684]

> Support submit to k8s only with token
> -
>
> Key: SPARK-33720
> URL: https://issues.apache.org/jira/browse/SPARK-33720
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.0.2
>Reporter: hong dongdong
>Assignee: hong dongdong
>Priority: Major
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34847) Simplify ResolveAggregateFunctions

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34847:


Assignee: (was: Apache Spark)

> Simplify ResolveAggregateFunctions
> --
>
> Key: SPARK-34847
> URL: https://issues.apache.org/jira/browse/SPARK-34847
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34847) Simplify ResolveAggregateFunctions

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307571#comment-17307571
 ] 

Apache Spark commented on SPARK-34847:
--

User 'cloud-fan' has created a pull request for this issue:
https://github.com/apache/spark/pull/31947

> Simplify ResolveAggregateFunctions
> --
>
> Key: SPARK-34847
> URL: https://issues.apache.org/jira/browse/SPARK-34847
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34847) Simplify ResolveAggregateFunctions

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34847:


Assignee: Apache Spark

> Simplify ResolveAggregateFunctions
> --
>
> Key: SPARK-34847
> URL: https://issues.apache.org/jira/browse/SPARK-34847
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34847) Simplify ResolveAggregateFunctions

2021-03-23 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-34847:
---

 Summary: Simplify ResolveAggregateFunctions
 Key: SPARK-34847
 URL: https://issues.apache.org/jira/browse/SPARK-34847
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Shen updated SPARK-34835:
---
Description: 
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


*From pandas to pyarrow*

{code:java}
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)

t = to_arrow_type(dt)

array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
{code}

The above code may not work. If UDT is like:
{code:java}
class ExamplePointWithTimeUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePointWithTime.
"""

@classmethod
def sqlType(self):
return StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), True),
StructField("ts", TimestampType(), False),
])

@classmethod
def module(cls):
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'

def serialize(self, obj):
return [obj.x, obj.y, obj.ts]

def deserialize(self, datum):
return ExamplePointWithTime(datum[0], datum[1], datum[2])


class ExamplePointWithTime:
"""
An example class to demonstrate UDT in Scala, Java, and Python.
"""

__UDT__ = ExamplePointWithTimeUDT()

def __init__(self, x, y, ts):
self.x = x
self.y = y
self.ts = ts

def __repr__(self):
return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)

def __str__(self):
return "(%s,%s,%s)" % (self.x, self.y, self.ts)

def __eq__(self, other):
return isinstance(other, self.__class__) \
and other.x == self.x and other.y == self.y \
and other.ts == self.ts
{code}


*From pyarrow to Spark Internal*
Serialize and deserialize will fail.

See the failed PR demo: https://github.com/eddyxu/spark/pull/4


  was:
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


*From pandas to pyarrow*

{code:java}
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)

t = to_arrow_type(dt)

array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
{code}

The above code may not work. If UDT is like:
{code:java}
class ExamplePointWithTimeUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePointWithTime.
"""

@classmethod
def sqlType(self):
return StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), True),
StructField("ts", TimestampType(), False),
])

@classmethod
def module(cls):
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'

def serialize(self, obj):
return [obj.x, obj.y, obj.ts]

def deserialize(self, datum):
return ExamplePointWithTime(datum[0], datum[1], datum[2])


class ExamplePointWithTime:
"""
An example class to demonstrate UDT in Scala, Java, and Python.
"""

__UDT__ = ExamplePointWithTimeUDT()

def __init__(self, x, y, ts):
self.x = x
self.y = y
self.ts = ts

def __repr__(self):
return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)

def __str__(self):
return "(%s,%s,%s)" % (self.x, self.y, self.ts)

def __eq__(self, other):
return isinstance(other, self.__class__) \
and other.x == self.x and other.y == self.y \
and other.ts == self.ts
{code}


*From pyarrow to Spark Internal*
Serialize and deserialize will fail.

https://github.com/eddyxu/spark/pull/4



> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>
> For user defined type with TimestampType, conversion from pandas to pyarrow 
> and from pyarrow to Spark UnsafeRow needs to be carefully handled.
> *From pandas to pyarrow*
> {code:java}
> if isinstance(dt, UserDefinedType):
> s = s.apply(dt.serialize)
> t = to_arrow_type(dt)
> array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
> {code}
> The above code may not work. If UDT is like:
> {code:java}
> class ExamplePointWithTimeUDT(UserDefinedType):
> """
> User-defined type (UDT) for ExamplePointWithTime.
> """
> @classmethod
> def sqlType(self):
> return StructType([
> StructField("x", DoubleType(), False),
>  

[jira] [Updated] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Shen updated SPARK-34835:
---
Description: 
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


*From pandas to pyarrow*

{code:java}
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)

t = to_arrow_type(dt)

array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
{code}

The above code may not work. If UDT is like:
{code:java}
class ExamplePointWithTimeUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePointWithTime.
"""

@classmethod
def sqlType(self):
return StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), True),
StructField("ts", TimestampType(), False),
])

@classmethod
def module(cls):
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'

def serialize(self, obj):
return [obj.x, obj.y, obj.ts]

def deserialize(self, datum):
return ExamplePointWithTime(datum[0], datum[1], datum[2])


class ExamplePointWithTime:
"""
An example class to demonstrate UDT in Scala, Java, and Python.
"""

__UDT__ = ExamplePointWithTimeUDT()

def __init__(self, x, y, ts):
self.x = x
self.y = y
self.ts = ts

def __repr__(self):
return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)

def __str__(self):
return "(%s,%s,%s)" % (self.x, self.y, self.ts)

def __eq__(self, other):
return isinstance(other, self.__class__) \
and other.x == self.x and other.y == self.y \
and other.ts == self.ts
{code}


*From pyarrow to Spark Internal*
Serialize and deserialize will fail.

https://github.com/eddyxu/spark/pull/4


  was:
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


*From pandas to pyarrow*

{code:java}
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)

t = to_arrow_type(dt)

array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
{code}

The above code may not work. If UDT is like:
{code:java}
class ExamplePointWithTimeUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePointWithTime.
"""

@classmethod
def sqlType(self):
return StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), True),
StructField("ts", TimestampType(), False),
])

@classmethod
def module(cls):
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'

def serialize(self, obj):
return [obj.x, obj.y, obj.ts]

def deserialize(self, datum):
return ExamplePointWithTime(datum[0], datum[1], datum[2])


class ExamplePointWithTime:
"""
An example class to demonstrate UDT in Scala, Java, and Python.
"""

__UDT__ = ExamplePointWithTimeUDT()

def __init__(self, x, y, ts):
self.x = x
self.y = y
self.ts = ts

def __repr__(self):
return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)

def __str__(self):
return "(%s,%s,%s)" % (self.x, self.y, self.ts)

def __eq__(self, other):
return isinstance(other, self.__class__) \
and other.x == self.x and other.y == self.y \
and other.ts == self.ts
{code}


*From pyarrow to Spark Internal*
Serialize and deserialize will fail.



> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>
> For user defined type with TimestampType, conversion from pandas to pyarrow 
> and from pyarrow to Spark UnsafeRow needs to be carefully handled.
> *From pandas to pyarrow*
> {code:java}
> if isinstance(dt, UserDefinedType):
> s = s.apply(dt.serialize)
> t = to_arrow_type(dt)
> array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
> {code}
> The above code may not work. If UDT is like:
> {code:java}
> class ExamplePointWithTimeUDT(UserDefinedType):
> """
> User-defined type (UDT) for ExamplePointWithTime.
> """
> @classmethod
> def sqlType(self):
> return StructType([
> StructField("x", DoubleType(), False),
> StructField("y", DoubleType(), True),
> 

[jira] [Updated] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Shen updated SPARK-34835:
---
Description: 
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


*From pandas to pyarrow*

{code:java}
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)

t = to_arrow_type(dt)

array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
{code}

The above code may not work. If UDT is like:
{code:java}
class ExamplePointWithTimeUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePointWithTime.
"""

@classmethod
def sqlType(self):
return StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), True),
StructField("ts", TimestampType(), False),
])

@classmethod
def module(cls):
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'

def serialize(self, obj):
return [obj.x, obj.y, obj.ts]

def deserialize(self, datum):
return ExamplePointWithTime(datum[0], datum[1], datum[2])


class ExamplePointWithTime:
"""
An example class to demonstrate UDT in Scala, Java, and Python.
"""

__UDT__ = ExamplePointWithTimeUDT()

def __init__(self, x, y, ts):
self.x = x
self.y = y
self.ts = ts

def __repr__(self):
return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)

def __str__(self):
return "(%s,%s,%s)" % (self.x, self.y, self.ts)

def __eq__(self, other):
return isinstance(other, self.__class__) \
and other.x == self.x and other.y == self.y \
and other.ts == self.ts
{code}


*From pyarrow to Spark Internal*
Serialize and deserialize will fail.


  was:
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


>From pandas to pyarrow

{code:java}
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)

t = to_arrow_type(dt)

array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
{code}

The above code may not work. If UDT is like:
{code:java}
class ExamplePointWithTimeUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePointWithTime.
"""

@classmethod
def sqlType(self):
return StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), True),
StructField("ts", TimestampType(), False),
])

@classmethod
def module(cls):
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'

def serialize(self, obj):
return [obj.x, obj.y, obj.ts]

def deserialize(self, datum):
return ExamplePointWithTime(datum[0], datum[1], datum[2])


class ExamplePointWithTime:
"""
An example class to demonstrate UDT in Scala, Java, and Python.
"""

__UDT__ = ExamplePointWithTimeUDT()

def __init__(self, x, y, ts):
self.x = x
self.y = y
self.ts = ts

def __repr__(self):
return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)

def __str__(self):
return "(%s,%s,%s)" % (self.x, self.y, self.ts)

def __eq__(self, other):
return isinstance(other, self.__class__) \
and other.x == self.x and other.y == self.y \
and other.ts == self.ts
{code}


>From pyarrow to Spark Internal
Serialize and deserialize will fail.



> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>
> For user defined type with TimestampType, conversion from pandas to pyarrow 
> and from pyarrow to Spark UnsafeRow needs to be carefully handled.
> *From pandas to pyarrow*
> {code:java}
> if isinstance(dt, UserDefinedType):
> s = s.apply(dt.serialize)
> t = to_arrow_type(dt)
> array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
> {code}
> The above code may not work. If UDT is like:
> {code:java}
> class ExamplePointWithTimeUDT(UserDefinedType):
> """
> User-defined type (UDT) for ExamplePointWithTime.
> """
> @classmethod
> def sqlType(self):
> return StructType([
> StructField("x", DoubleType(), False),
> StructField("y", DoubleType(), True),
> StructField("ts", TimestampType(), False),

[jira] [Updated] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Shen updated SPARK-34835:
---
Description: 
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


>From pandas to pyarrow

{code:java}
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)

t = to_arrow_type(dt)

array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
{code}

The above code may not work. If UDT is like:
{code:java}
class ExamplePointWithTimeUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePointWithTime.
"""

@classmethod
def sqlType(self):
return StructType([
StructField("x", DoubleType(), False),
StructField("y", DoubleType(), True),
StructField("ts", TimestampType(), False),
])

@classmethod
def module(cls):
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'

def serialize(self, obj):
return [obj.x, obj.y, obj.ts]

def deserialize(self, datum):
return ExamplePointWithTime(datum[0], datum[1], datum[2])


class ExamplePointWithTime:
"""
An example class to demonstrate UDT in Scala, Java, and Python.
"""

__UDT__ = ExamplePointWithTimeUDT()

def __init__(self, x, y, ts):
self.x = x
self.y = y
self.ts = ts

def __repr__(self):
return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)

def __str__(self):
return "(%s,%s,%s)" % (self.x, self.y, self.ts)

def __eq__(self, other):
return isinstance(other, self.__class__) \
and other.x == self.x and other.y == self.y \
and other.ts == self.ts
{code}


>From pyarrow to Spark Internal
Serialize and deserialize will fail.


  was:
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


## From pandas to pyarrow

{code:java}
pa.StructArray.from_pandas
{code}


## From pyarrow to Spark Internal
Serialize and deserialize will fail.



> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>
> For user defined type with TimestampType, conversion from pandas to pyarrow 
> and from pyarrow to Spark UnsafeRow needs to be carefully handled.
> From pandas to pyarrow
> {code:java}
> if isinstance(dt, UserDefinedType):
> s = s.apply(dt.serialize)
> t = to_arrow_type(dt)
> array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
> {code}
> The above code may not work. If UDT is like:
> {code:java}
> class ExamplePointWithTimeUDT(UserDefinedType):
> """
> User-defined type (UDT) for ExamplePointWithTime.
> """
> @classmethod
> def sqlType(self):
> return StructType([
> StructField("x", DoubleType(), False),
> StructField("y", DoubleType(), True),
> StructField("ts", TimestampType(), False),
> ])
> @classmethod
> def module(cls):
> return 'pyspark.sql.tests'
> @classmethod
> def scalaUDT(cls):
> return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'
> def serialize(self, obj):
> return [obj.x, obj.y, obj.ts]
> def deserialize(self, datum):
> return ExamplePointWithTime(datum[0], datum[1], datum[2])
> class ExamplePointWithTime:
> """
> An example class to demonstrate UDT in Scala, Java, and Python.
> """
> __UDT__ = ExamplePointWithTimeUDT()
> def __init__(self, x, y, ts):
> self.x = x
> self.y = y
> self.ts = ts
> def __repr__(self):
> return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)
> def __str__(self):
> return "(%s,%s,%s)" % (self.x, self.y, self.ts)
> def __eq__(self, other):
> return isinstance(other, self.__class__) \
> and other.x == self.x and other.y == self.y \
> and other.ts == self.ts
> {code}
> From pyarrow to Spark Internal
> Serialize and deserialize will fail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Shen updated SPARK-34835:
---
Description: 
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


## From pandas to pyarrow

{code:java}
pa.StructArray.from_pandas
{code}


## From pyarrow to Spark Internal
Serialize and deserialize will fail.


  was:
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


## From pandas to pyarrow

{code:java}
pa.StructArray.from_pandas
{code}


## From pyarrow to Spark Internal
Serialize and deserialize will fail.



> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>
> For user defined type with TimestampType, conversion from pandas to pyarrow 
> and from pyarrow to Spark UnsafeRow needs to be carefully handled.
> ## From pandas to pyarrow
> {code:java}
> pa.StructArray.from_pandas
> {code}
> ## From pyarrow to Spark Internal
> Serialize and deserialize will fail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Shen updated SPARK-34835:
---
Description: 
For user defined type with TimestampType, conversion from pandas to pyarrow and 
from pyarrow to Spark UnsafeRow needs to be carefully handled.


## From pandas to pyarrow

{code:java}
pa.StructArray.from_pandas
{code}


## From pyarrow to Spark Internal
Serialize and deserialize will fail.


  was:For user defined type with TimestampType, conversion from pandas to 
pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled.


> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>
> For user defined type with TimestampType, conversion from pandas to pyarrow 
> and from pyarrow to Spark UnsafeRow needs to be carefully handled.
> ## From pandas to pyarrow
> {code:java}
> pa.StructArray.from_pandas
> {code}
> ## From pyarrow to Spark Internal
> Serialize and deserialize will fail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Darcy Shen updated SPARK-34835:
---
Description: For user defined type with TimestampType, conversion from 
pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully 
handled.

> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>
> For user defined type with TimestampType, conversion from pandas to pyarrow 
> and from pyarrow to Spark UnsafeRow needs to be carefully handled.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-03-23 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307557#comment-17307557
 ] 

Dongjoon Hyun commented on SPARK-18105:
---

Got it. Thank you for the details, [~devaraj]. Let's keep this open for a while 
because this issue needs more information.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Davies Liu
>Priority: Major
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method

2021-03-23 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307554#comment-17307554
 ] 

Dongjoon Hyun commented on SPARK-34674:
---

Let's keep this issue open for a while to collect more opinion. We need more 
information for this issue report.

> Spark app on k8s doesn't terminate without call to sparkContext.stop() method
> -
>
> Key: SPARK-34674
> URL: https://issues.apache.org/jira/browse/SPARK-34674
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.1.1
>Reporter: Sergey Kotlov
>Priority: Major
>
> Hello!
>  I have run into a problem that if I don't call the method 
> sparkContext.stop() explicitly, then a Spark driver process doesn't terminate 
> even after its Main method has been completed. This behaviour is different 
> from spark on yarn, where the manual sparkContext stopping is not required.
>  It looks like, the problem is in using non-daemon threads, which prevent the 
> driver jvm process from terminating.
>  At least I see two non-daemon threads, if I don't call sparkContext.stop():
> {code:java}
> Thread[OkHttp kubernetes.default.svc,5,main]
> Thread[OkHttp kubernetes.default.svc Writer,5,main]
> {code}
> Could you tell please, if it is possible to solve this problem?
> Docker image from the official release of spark-3.1.1 hadoop3.2 is used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307502#comment-17307502
 ] 

Jason Yarbrough commented on SPARK-34843:
-

I'm definitely interested. Thank you :)

Fortunately, this proposed formula should be able to handle potential overflow 
issues just as well as the old method, and also retains the highest level 
accuracy you can get with the stride size (short of adjusting the partition 
count... I've also coded up functionality that can dynamically determine that).

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition

2021-03-23 Thread Jason Yarbrough (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307500#comment-17307500
 ] 

Jason Yarbrough commented on SPARK-34844:
-

The way I'm currently supplying it is by using a Map with lowerBound, 
upperBound, numPartitions, and partitionColumn. For example:

Map(
  "lowerBound" -> "1",
  "upperBound" -> "10",
  "numPartitions" -> "1000",
  "partitionColumn" -> "SomeIndexedColumn"
 )

In this case, I would expect the Spark code to respect the lower boundary, but 
it won't.

The thing that makes it even more important is some users may provide very well 
picked bounds. For instance, I've coded up an "auto-partitioner." One of the 
things it does is use the approxQuantile function to determine a very accurate 
lower and upper bound (based off the partition count).

I actually wish we could provide more breakpoints than just the lower and upper 
bounds, but that's for another ticket ;).

> JDBCRelation columnPartition function includes the first stride in the lower 
> partition
> --
>
> Key: SPARK-34844
> URL: https://issues.apache.org/jira/browse/SPARK-34844
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, columnPartition in JDBCRelation contains logic that adds the first 
> stride into the lower partition. Because of this, the lower bound isn't used 
> as the ceiling for the lower partition.
> For example, say we have data 0-10, 10 partitions, and the lowerBound is set 
> to 1. The lower/first partition should contain anything < 1. However, in the 
> current implementation, it would include anything < 2.
> A possible easy fix would be changing the following code on line 132:
> currentValue += stride
> To:
> if (i != 0) currentValue += stride
> Or include currentValue += stride within the if statement on line 131... 
> although this creates a pretty bad looking side-effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307498#comment-17307498
 ] 

Takeshi Yamamuro commented on SPARK-34843:
--

If the overflow is rare, it might be worth improving it. For example, I think 
we might be able to improve it by checking whether the overflow can happen or 
not in advance. Anyway, if you're interested in this improvement, please feel 
free to open a PR for it.

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307494#comment-17307494
 ] 

Jason Yarbrough edited comment on SPARK-34843 at 3/24/21, 1:32 AM:
---

Updated the formula. I also updated the comment and included the example you 
provided, to help others understand.


was (Author: hanover-fiste):
Updated the formula.

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Attachment: (was: SPARK-34843.patch)

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Attachment: SPARK-34843.patch

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307496#comment-17307496
 ] 

Takeshi Yamamuro edited comment on SPARK-34844 at 3/24/21, 1:30 AM:


> It depends on how much data skew you have.

Yea, I agree with that. What I wanted to say is that whether the proposed one 
works well or not also depends on data distribution.

> However, my question would be why skip using the lower bound anyway? It would 
> make more sense to use what the user supplied.

IIUC we don't have a strong reason on that (we've used the initial 
implementation for a long time: 
[https://github.com/apache/spark/commit/8f471a66db0571a76a21c0d93312197fee16174a]).
 I think there is no best solution if we don't know data distribution in 
advance.

Any idea about how a user supplies these boundaries for partitions?


was (Author: maropu):
> It depends on how much data skew you have.

Yea, I agree with that. What I wanted to say is that whether the proposed one 
works well or not also depends on data distribution.

> However, my question would be why skip using the lower bound anyway? It would 
> make more sense to use what the user supplied.

IIUC we don't have a strong reason on that (we've used the initial 
implementation for a long time: 
[https://github.com/apache/spark/commit/8f471a66db0571a76a21c0d93312197fee16174a]).
 I think there is no best solution if we don't know data distribution in 
advance.

Any idea about how a user supply these boundaries for partitions?

> JDBCRelation columnPartition function includes the first stride in the lower 
> partition
> --
>
> Key: SPARK-34844
> URL: https://issues.apache.org/jira/browse/SPARK-34844
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, columnPartition in JDBCRelation contains logic that adds the first 
> stride into the lower partition. Because of this, the lower bound isn't used 
> as the ceiling for the lower partition.
> For example, say we have data 0-10, 10 partitions, and the lowerBound is set 
> to 1. The lower/first partition should contain anything < 1. However, in the 
> current implementation, it would include anything < 2.
> A possible easy fix would be changing the following code on line 132:
> currentValue += stride
> To:
> if (i != 0) currentValue += stride
> Or include currentValue += stride within the if statement on line 131... 
> although this creates a pretty bad looking side-effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307496#comment-17307496
 ] 

Takeshi Yamamuro commented on SPARK-34844:
--

> It depends on how much data skew you have.

Yea, I agree with that. What I wanted to say is that whether the proposed one 
works well or not also depends on data distribution.

> However, my question would be why skip using the lower bound anyway? It would 
> make more sense to use what the user supplied.

IIUC we don't have a strong reason on that (we've used the initial 
implementation for a long time: 
[https://github.com/apache/spark/commit/8f471a66db0571a76a21c0d93312197fee16174a]).
 I think there is no best solution if we don't know data distribution in 
advance.

Any idea about how a user supply these boundaries for partitions?

> JDBCRelation columnPartition function includes the first stride in the lower 
> partition
> --
>
> Key: SPARK-34844
> URL: https://issues.apache.org/jira/browse/SPARK-34844
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, columnPartition in JDBCRelation contains logic that adds the first 
> stride into the lower partition. Because of this, the lower bound isn't used 
> as the ceiling for the lower partition.
> For example, say we have data 0-10, 10 partitions, and the lowerBound is set 
> to 1. The lower/first partition should contain anything < 1. However, in the 
> current implementation, it would include anything < 2.
> A possible easy fix would be changing the following code on line 132:
> currentValue += stride
> To:
> if (i != 0) currentValue += stride
> Or include currentValue += stride within the if statement on line 131... 
> although this creates a pretty bad looking side-effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Attachment: SPARK-34843.patch

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Attachment: (was: SPARK-34843.patch)

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307494#comment-17307494
 ] 

Jason Yarbrough commented on SPARK-34843:
-

Updated the formula.

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> (numPartitions - 2.0))).floor.toLong
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Description: 
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
(numPartitions - 2.0))).floor.toLong

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-06-05 (17323). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34

This would put the upper bound at 2020-02-28 (18321), which is much closer to 
the original supplied upper bound. This is the best you can do to get as close 
as possible to the upper bound (without adjusting the number of partitions). 
For example, a stride size of 35 would go well past the supplied upper bound 
(over 2 years, 2022-11-22).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 

  was:
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-06-05 (17323). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 2) = 34

This would put the upper bound at 2020-02-28 (18321), which is much closer to 
the original supplied upper bound. This is the best you can do to get as close 
as possible to the upper bound (without adjusting the number of partitions). 
For example, a stride size of 35 would go well past the supplied upper bound 
(over 2 years, 2022-11-22).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 


> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / 
> 

[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307493#comment-17307493
 ] 

Jason Yarbrough commented on SPARK-34843:
-

That's a good example! Thank you for sharing that. I have a new formula that 
will handle that while also retaining the accuracy of the other proposed 
formula.

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> (18563L - -15611) / (1000L - 2) = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34846:


Assignee: Apache Spark

> Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
> 
>
> Key: SPARK-34846
> URL: https://issues.apache.org/jira/browse/SPARK-34846
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.2, 3.0.3
>Reporter: Takeshi Yamamuro
>Assignee: Apache Spark
>Priority: Minor
>
> This ticket aims at sorting output rows in SQLQueryTestSuite even if a given 
> plan has a `Sort` node. In that case, the current logic skips sorting output 
> rows (see the code below for details):
> https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522
> But, I think we still need to sort output rows if the node just sort output 
> rows by partial columns (instead of all columns). The current logic can lead 
> to a flaky test behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307492#comment-17307492
 ] 

Apache Spark commented on SPARK-34846:
--

User 'maropu' has created a pull request for this issue:
https://github.com/apache/spark/pull/31946

> Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
> 
>
> Key: SPARK-34846
> URL: https://issues.apache.org/jira/browse/SPARK-34846
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.2, 3.0.3
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> This ticket aims at sorting output rows in SQLQueryTestSuite even if a given 
> plan has a `Sort` node. In that case, the current logic skips sorting output 
> rows (see the code below for details):
> https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522
> But, I think we still need to sort output rows if the node just sort output 
> rows by partial columns (instead of all columns). The current logic can lead 
> to a flaky test behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307490#comment-17307490
 ] 

Apache Spark commented on SPARK-34846:
--

User 'maropu' has created a pull request for this issue:
https://github.com/apache/spark/pull/31946

> Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
> 
>
> Key: SPARK-34846
> URL: https://issues.apache.org/jira/browse/SPARK-34846
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.2, 3.0.3
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> This ticket aims at sorting output rows in SQLQueryTestSuite even if a given 
> plan has a `Sort` node. In that case, the current logic skips sorting output 
> rows (see the code below for details):
> https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522
> But, I think we still need to sort output rows if the node just sort output 
> rows by partial columns (instead of all columns). The current logic can lead 
> to a flaky test behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34846:


Assignee: (was: Apache Spark)

> Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
> 
>
> Key: SPARK-34846
> URL: https://issues.apache.org/jira/browse/SPARK-34846
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.2, 3.0.3
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> This ticket aims at sorting output rows in SQLQueryTestSuite even if a given 
> plan has a `Sort` node. In that case, the current logic skips sorting output 
> rows (see the code below for details):
> https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522
> But, I think we still need to sort output rows if the node just sort output 
> rows by partial columns (instead of all columns). The current logic can lead 
> to a flaky test behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node

2021-03-23 Thread Takeshi Yamamuro (Jira)
Takeshi Yamamuro created SPARK-34846:


 Summary: Sort output rows in SQLQueryTestSuite even if a plan has 
a Sort node
 Key: SPARK-34846
 URL: https://issues.apache.org/jira/browse/SPARK-34846
 Project: Spark
  Issue Type: Bug
  Components: SQL, Tests
Affects Versions: 3.2.0, 3.1.2, 3.0.3
Reporter: Takeshi Yamamuro


This ticket aims at sorting output rows in SQLQueryTestSuite even if a given 
plan has a `Sort` node. In that case, the current logic skips sorting output 
rows (see the code below for details):
https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522

But, I think we still need to sort output rows if the node just sort output 
rows by partial columns (instead of all columns). The current logic can lead to 
a flaky test behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30641) Project Matrix: Linear Models revisit and refactor

2021-03-23 Thread zhengruifeng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengruifeng reassigned SPARK-30641:


Assignee: (was: zhengruifeng)

> Project Matrix: Linear Models revisit and refactor
> --
>
> Key: SPARK-30641
> URL: https://issues.apache.org/jira/browse/SPARK-30641
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, PySpark
>Affects Versions: 3.1.0, 3.2.0
>Reporter: zhengruifeng
>Priority: Major
>
> We had been refactoring linear models for a long time, and there still are 
> some works in the future. After some discuss among [~huaxingao] [~srowen] 
> [~weichenxu123] , we decide to gather related works under a sub-project 
> Matrix, it include:
>  # *Blockification (vectorization of vectors)*
>  ** vectors are stacked into matrices, so that high-level BLAS can be used 
> for better performance. (about ~3x faster on sparse datasets, up to ~18x 
> faster on dense datasets, see SPARK-31783 for details).
>  ** Since 3.1.1, LoR/SVC/LiR/AFT supports blockification, and we need to 
> blockify KMeans in the future.
>  # *Standardization (virutal centering)*
>  ** Existing impl of standardization in linear models does NOT center the 
> vectors by removing the means, for the purpose of keeping dataset 
> _*sparsity*_. However, this will cause feature values with small var be 
> scaled to large values, and underlying solver like LBFGS can not efficiently 
> handle this case. see SPARK-34448 for details.
>  ** If internal vectors are centers (like other famous impl, i.e. 
> GLMNET/Scikit-Learn), the convergence ratio will be better. In the case in 
> SPARK-34448, the number of iteration to convergence will be reduced from 93 
> to 6. Moreover, the final solution is much more close to the one in GLMNET.
>  ** Luckily, we find a new way to _*virtually*_ center the vectors without 
> densifying the dataset. Good results had been observed in LoR, we will take 
> it into account in other linear models.
>  # _*Initialization (To be discussed)*_
>  ** Initializing model coef with a given model, should be beneficial to: 1, 
> convergence ratio (should reduce number of iterations); 2, model stability 
> (may obtain a new solution more close to the previous one);
>  # _*Early Stopping* *(To be discussed)*_
>  ** we can compute the test error in the procedure (like tree models), and 
> stop the training procedure if test error begin to increase;
>  
>   If you want to add other features in these models, please comment in 
> the ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition

2021-03-23 Thread Jason Yarbrough (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307484#comment-17307484
 ] 

Jason Yarbrough commented on SPARK-34844:
-

It depends on how much data skew you have. I think the main thing here is that 
the codes (seems) to needlessly lump in the first stride with the lower 
partition–it's not expected behavior. If I set a lower bound of 1, it should be 
1, not 2 (using the example above). I can understand that the last partition 
boundary will not be perfect due to the strides not lining up exactly (although 
the current formula can create very suboptimal results), but I don't understand 
why the lower bound is skipped for the first partition.

 

When working with data that is skewed, these things can add up and make a big 
difference. We shaved off 10 minutes on a 36 minute job by creating code that 
works around these issues (this incudes SPARK-34843). That was 27% taken off, 
which would be huge at larger scale.

 

The lower bound will have less of a performance impact (if you don't hit memory 
issues) since other tasks will still be processing while the first partition is 
taking a long time. However, my question would be why skip using the lower 
bound anyway? It would make more sense to use what the user supplied.

> JDBCRelation columnPartition function includes the first stride in the lower 
> partition
> --
>
> Key: SPARK-34844
> URL: https://issues.apache.org/jira/browse/SPARK-34844
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, columnPartition in JDBCRelation contains logic that adds the first 
> stride into the lower partition. Because of this, the lower bound isn't used 
> as the ceiling for the lower partition.
> For example, say we have data 0-10, 10 partitions, and the lowerBound is set 
> to 1. The lower/first partition should contain anything < 1. However, in the 
> current implementation, it would include anything < 2.
> A possible easy fix would be changing the following code on line 132:
> currentValue += stride
> To:
> if (i != 0) currentValue += stride
> Or include currentValue += stride within the if statement on line 131... 
> although this creates a pretty bad looking side-effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307483#comment-17307483
 ] 

Takeshi Yamamuro commented on SPARK-34843:
--

How about `upperBound=Long.MaxValue, lowerBound=Long.MinValue`?

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> (18563L - -15611) / (1000L - 2) = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307479#comment-17307479
 ] 

Takeshi Yamamuro commented on SPARK-34844:
--

The current logic can cause any issue? I feel there are no big difference 
between the current one and the proposed one though.

> JDBCRelation columnPartition function includes the first stride in the lower 
> partition
> --
>
> Key: SPARK-34844
> URL: https://issues.apache.org/jira/browse/SPARK-34844
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, columnPartition in JDBCRelation contains logic that adds the first 
> stride into the lower partition. Because of this, the lower bound isn't used 
> as the ceiling for the lower partition.
> For example, say we have data 0-10, 10 partitions, and the lowerBound is set 
> to 1. The lower/first partition should contain anything < 1. However, in the 
> current implementation, it would include anything < 2.
> A possible easy fix would be changing the following code on line 132:
> currentValue += stride
> To:
> if (i != 0) currentValue += stride
> Or include currentValue += stride within the if statement on line 131... 
> although this creates a pretty bad looking side-effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307477#comment-17307477
 ] 

Jason Yarbrough commented on SPARK-34843:
-

>From what I can see, there is protection in the code above the stride formula 
>that makes sure the partition count isn't greater than upperBound - 
>lowerBound. There is also code above that to make sure that someone doesn't 
>pass in a numPartition of 1.

I'm having a hard time wrapping my head around where this proposed formula 
would cause an issue, but I'm hoping you, or someone, could help clarify. Would 
you be able to provide an example of the overflow issue in this case?

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> (18563L - -15611) / (1000L - 2) = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307471#comment-17307471
 ] 

Takeshi Yamamuro commented on SPARK-34835:
--

Please fill the description.

> Support TimestampType in UDT
> 
>
> Key: SPARK-34835
> URL: https://issues.apache.org/jira/browse/SPARK-34835
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Darcy Shen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307469#comment-17307469
 ] 

Takeshi Yamamuro commented on SPARK-34843:
--

To mitigate overflow issues, I think we use the current formula.

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> (18563L - -15611) / (1000L - 2) = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Attachment: SPARK-34843.patch

> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
> Attachments: SPARK-34843.patch
>
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-06-05 (17323). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> (18563L - -15611) / (1000L - 2) = 34
> This would put the upper bound at 2020-02-28 (18321), which is much closer to 
> the original supplied upper bound. This is the best you can do to get as 
> close as possible to the upper bound (without adjusting the number of 
> partitions). For example, a stride size of 35 would go well past the supplied 
> upper bound (over 2 years, 2022-11-22).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34844:

Description: 
Currently, columnPartition in JDBCRelation contains logic that adds the first 
stride into the lower partition. Because of this, the lower bound isn't used as 
the ceiling for the lower partition.

For example, say we have data 0-10, 10 partitions, and the lowerBound is set to 
1. The lower/first partition should contain anything < 1. However, in the 
current implementation, it would include anything < 2.

A possible easy fix would be changing the following code on line 132:

currentValue += stride

To:

if (i != 0) currentValue += stride

Or include currentValue += stride within the if statement on line 131... 
although this creates a pretty bad looking side-effect.

  was:
Currently, columnPartition in JDBCRelation contains logic that adds the first 
stride into the lower partition. Because of this, the lower bound isn't used as 
the ceiling for the lower partition.

For example, say we have data 0-10, 10 partitions, and the lowerBound is set to 
1. The lower/first partition should contain anything < 1. However, in the 
current implementation, it would include anything < 2.

A possible easy fix would be changing the following code on line 132:

currentValue += stride

To:

if (i != 0) currentValue += stride

Or include currentValue += stride within the if statement on line 131... 
although this creates a pretty nasty looking side-effect.


> JDBCRelation columnPartition function includes the first stride in the lower 
> partition
> --
>
> Key: SPARK-34844
> URL: https://issues.apache.org/jira/browse/SPARK-34844
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, columnPartition in JDBCRelation contains logic that adds the first 
> stride into the lower partition. Because of this, the lower bound isn't used 
> as the ceiling for the lower partition.
> For example, say we have data 0-10, 10 partitions, and the lowerBound is set 
> to 1. The lower/first partition should contain anything < 1. However, in the 
> current implementation, it would include anything < 2.
> A possible easy fix would be changing the following code on line 132:
> currentValue += stride
> To:
> if (i != 0) currentValue += stride
> Or include currentValue += stride within the if statement on line 131... 
> although this creates a pretty bad looking side-effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Description: 
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-06-05 (17323). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 2) = 34

This would put the upper bound at 2020-02-28 (18321), which is much closer to 
the original supplied upper bound. This is the best you can do to get as close 
as possible to the upper bound (without adjusting the number of partitions). 
For example, a stride size of 35 would go well past the supplied upper bound 
(over 2 years, 2022-11-22).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 

  was:
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-06-05 (17323). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 1) = 34

This would put the upper bound at 2020-02-28 (18321), which is much closer to 
the original supplied upper bound. This is the best you can do to get as close 
as possible to the upper bound (without adjusting the number of partitions). 
For example, a stride size of 35 would go well past the supplied upper bound 
(over 2 years, 2022-11-22).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 


> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 2)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets 

[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Description: 
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-06-05 (17323). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 1) = 34

This would put the upper bound at 2020-02-28 (18321), which is much closer to 
the original supplied upper bound. This is the best you can do to get as close 
as possible to the upper bound (without adjusting the number of partitions). 
For example, a stride size of 35 would go well past the supplied upper bound 
(over 2 years, 2022-11-22).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 

  was:
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-06-05 (17323). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 1) = 34

This would put the upper bound at 2020-02-28 (18321), which is much closer to 
the original supplied upper bound. This is the best you can do to get as close 
as possible to the upper bound (without adjusting the number of partitions). 
For example, a stride size of 35 would go well past the supplied upper bound 
(over 2 years).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 


> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to 

[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Description: 
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-06-05 (17323). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 1) = 34

This would put the upper bound at 2020-02-28 (18321), which is much closer to 
the original supplied upper bound. This is the best you can do to get as close 
as possible to the upper bound (without adjusting the number of partitions). 
For example, a stride size of 35 would go well past the supplied upper bound 
(over 2 years).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 

  was:
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-07-08 (17356). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 1) = 34

This would put the upper bound at 2020-04-02, which is much closer to the 
original supplied upper bound. This is the best you can do to get as close as 
possible to the upper bound (without adjusting the number of partitions). For 
example, a stride size of 35 would go well past the supplied upper bound (over 
2 years).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 


> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an 

[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Yarbrough updated SPARK-34843:

Description: 
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)

 

An example (using a date column):

Say you're creating 1,000 partitions. If you provide a lower bound of 
1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
(translated to 18563), Spark determines the stride size as follows:

 

(18563L / 1000L) - (-15611 / 1000L) = 33

Starting from the lower bound, doing 998 strides of 33, you end up at 
2017-07-08 (17356). This is over 3 years of extra data that will go into the 
last partition, and depending on the shape of the data could cause a very long 
running task at the end of a job.

 

Using the formula I'm proposing, you'd get:

(18563L - -15611) / (1000L - 1) = 34

This would put the upper bound at 2020-04-02, which is much closer to the 
original supplied upper bound. This is the best you can do to get as close as 
possible to the upper bound (without adjusting the number of partitions). For 
example, a stride size of 35 would go well past the supplied upper bound (over 
2 years).

 

In the above example, there is only a difference of 1 between the stride size 
using the current formula and the stride size using the proposed formula, but 
with greater distance between the lower and upper bounds, or a lower number of 
partitions, the difference can be much greater. 

  was:
Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)


> JDBCRelation columnPartition function improperly determines stride size
> ---
>
> Key: SPARK-34843
> URL: https://issues.apache.org/jira/browse/SPARK-34843
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Jason Yarbrough
>Priority: Minor
>
> Currently, in JDBCRelation (line 123), the stride size is calculated as 
> follows:
> val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
>  
> Due to truncation happening on both divisions, the stride size can fall short 
> of what it should be. This can lead to a big difference between the provided 
> upper bound and the actual start of the last partition.
> I propose this formula, as it is much more accurate and leads to better 
> distribution:
> val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)
>  
> An example (using a date column):
> Say you're creating 1,000 partitions. If you provide a lower bound of 
> 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 
> (translated to 18563), Spark determines the stride size as follows:
>  
> (18563L / 1000L) - (-15611 / 1000L) = 33
> Starting from the lower bound, doing 998 strides of 33, you end up at 
> 2017-07-08 (17356). This is over 3 years of extra data that will go into the 
> last partition, and depending on the shape of the data could cause a very 
> long running task at the end of a job.
>  
> Using the formula I'm proposing, you'd get:
> (18563L - -15611) / (1000L - 1) = 34
> This would put the upper bound at 2020-04-02, which is much closer to the 
> original supplied upper bound. This is the best you can do to get as close as 
> possible to the upper bound (without adjusting the number of partitions). For 
> example, a stride size of 35 would go well past the supplied upper bound 
> (over 2 years).
>  
> In the above example, there is only a difference of 1 between the stride size 
> using the current formula and the stride size using the proposed formula, but 
> with greater distance between the lower and upper bounds, or a lower number 
> of partitions, the difference can be much greater. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To 

[jira] [Commented] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307456#comment-17307456
 ] 

Apache Spark commented on SPARK-34845:
--

User 'baohe-zhang' has created a pull request for this issue:
https://github.com/apache/spark/pull/31945

> ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of 
> child pids metrics are missing
> 
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum 
> of a subset of child pids), instead of an all 0 result. This can be 
> misleading and is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
> How to reproduce it?
> This unit test is kind of self-explanatory:
> {code:java}
> val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
> val mockedP = spy(p)
> // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
> returned
> var ptree = Set(26109, 22764, 22763)
> when(mockedP.computeProcessTree).thenReturn(ptree)
> var r = mockedP.computeAllMetrics
> assert(r.jvmVmemTotal == 0)
> assert(r.jvmRSSTotal == 0)
> assert(r.pythonVmemTotal == 0)
> assert(r.pythonRSSTotal == 0)
> {code}
> In the current implementation, computeAllMetrics will reset the allMetrics to 
> 0 when processing 22764 because 22764's proc file doesn't exist, but then it 
> will continue processing pid 22763, and update allMetrics to procfs metrics 
> of pid 22763.
> Also, a side effect of this bug is that it can lead to a verbose warning log 
> if many pids' stat files are missing. An early terminating can make the 
> warning logs more concise.
> How to solve it?
> The issue can be fixed by throwing IOException to computeAllMetrics(), in 
> that case, computeAllMetrics can aware that at lease one child pid's procfs 
> metrics is missing and then terminate the metrics reporting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34845:


Assignee: Apache Spark

> ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of 
> child pids metrics are missing
> 
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Assignee: Apache Spark
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum 
> of a subset of child pids), instead of an all 0 result. This can be 
> misleading and is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
> How to reproduce it?
> This unit test is kind of self-explanatory:
> {code:java}
> val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
> val mockedP = spy(p)
> // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
> returned
> var ptree = Set(26109, 22764, 22763)
> when(mockedP.computeProcessTree).thenReturn(ptree)
> var r = mockedP.computeAllMetrics
> assert(r.jvmVmemTotal == 0)
> assert(r.jvmRSSTotal == 0)
> assert(r.pythonVmemTotal == 0)
> assert(r.pythonRSSTotal == 0)
> {code}
> In the current implementation, computeAllMetrics will reset the allMetrics to 
> 0 when processing 22764 because 22764's proc file doesn't exist, but then it 
> will continue processing pid 22763, and update allMetrics to procfs metrics 
> of pid 22763.
> Also, a side effect of this bug is that it can lead to a verbose warning log 
> if many pids' stat files are missing. An early terminating can make the 
> warning logs more concise.
> How to solve it?
> The issue can be fixed by throwing IOException to computeAllMetrics(), in 
> that case, computeAllMetrics can aware that at lease one child pid's procfs 
> metrics is missing and then terminate the metrics reporting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34845:


Assignee: (was: Apache Spark)

> ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of 
> child pids metrics are missing
> 
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum 
> of a subset of child pids), instead of an all 0 result. This can be 
> misleading and is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
> How to reproduce it?
> This unit test is kind of self-explanatory:
> {code:java}
> val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
> val mockedP = spy(p)
> // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
> returned
> var ptree = Set(26109, 22764, 22763)
> when(mockedP.computeProcessTree).thenReturn(ptree)
> var r = mockedP.computeAllMetrics
> assert(r.jvmVmemTotal == 0)
> assert(r.jvmRSSTotal == 0)
> assert(r.pythonVmemTotal == 0)
> assert(r.pythonRSSTotal == 0)
> {code}
> In the current implementation, computeAllMetrics will reset the allMetrics to 
> 0 when processing 22764 because 22764's proc file doesn't exist, but then it 
> will continue processing pid 22763, and update allMetrics to procfs metrics 
> of pid 22763.
> Also, a side effect of this bug is that it can lead to a verbose warning log 
> if many pids' stat files are missing. An early terminating can make the 
> warning logs more concise.
> How to solve it?
> The issue can be fixed by throwing IOException to computeAllMetrics(), in 
> that case, computeAllMetrics can aware that at lease one child pid's procfs 
> metrics is missing and then terminate the metrics reporting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307454#comment-17307454
 ] 

Apache Spark commented on SPARK-34845:
--

User 'baohe-zhang' has created a pull request for this issue:
https://github.com/apache/spark/pull/31945

> ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of 
> child pids metrics are missing
> 
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum 
> of a subset of child pids), instead of an all 0 result. This can be 
> misleading and is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
> How to reproduce it?
> This unit test is kind of self-explanatory:
> {code:java}
> val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
> val mockedP = spy(p)
> // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
> returned
> var ptree = Set(26109, 22764, 22763)
> when(mockedP.computeProcessTree).thenReturn(ptree)
> var r = mockedP.computeAllMetrics
> assert(r.jvmVmemTotal == 0)
> assert(r.jvmRSSTotal == 0)
> assert(r.pythonVmemTotal == 0)
> assert(r.pythonRSSTotal == 0)
> {code}
> In the current implementation, computeAllMetrics will reset the allMetrics to 
> 0 when processing 22764 because 22764's proc file doesn't exist, but then it 
> will continue processing pid 22763, and update allMetrics to procfs metrics 
> of pid 22763.
> Also, a side effect of this bug is that it can lead to a verbose warning log 
> if many pids' stat files are missing. An early terminating can make the 
> warning logs more concise.
> How to solve it?
> The issue can be fixed by throwing IOException to computeAllMetrics(), in 
> that case, computeAllMetrics can aware that at lease one child pid's procfs 
> metrics is missing and then terminate the metrics reporting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-03-23 Thread Devaraj Kavali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307453#comment-17307453
 ] 

Devaraj Kavali commented on SPARK-18105:


[~dongjoon] We are seeing this error while running workloads with ~10TB shuffle 
data, we are not setting spark.io.encryption.enabled=true. 

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Davies Liu
>Priority: Major
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Baohe Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Baohe Zhang updated SPARK-34845:

Description: 
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of 
a subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

How to reproduce it?

This unit test is kind of self-explanatory:

 
{code:java}
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
val mockedP = spy(p)

// proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
returned
var ptree = Set(26109, 22764, 22763)
when(mockedP.computeProcessTree).thenReturn(ptree)
var r = mockedP.computeAllMetrics
assert(r.jvmVmemTotal == 0)
assert(r.jvmRSSTotal == 0)
assert(r.pythonVmemTotal == 0)
assert(r.pythonRSSTotal == 0)
{code}
In the current implementation, computeAllMetrics will reset the allMetrics to 0 
when processing 22764 because 22764's proc file doesn't exist, but then it will 
continue processing pid 22763, and update allMetrics to procfs metrics of pid 
22763.

Also, a side effect of this bug is that it can lead to a verbose warning log if 
many pids' stat files are missing. An early terminating can make the warning 
logs more concise.

How to solve it?

The issue can be fixed by throwing IOException to computeAllMetrics(), in that 
case computeAllMetrics can aware that at lease one child pid's procfs metrics 
is missing and then terminate the metrics reporting.

  was:
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a 
subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

 

Also, a side effect of it is that it can lead to a verbose warning log if many 
pids' stat files are missing. Also, a side effect of it is that it can lead to 
verbose warning logs if many pids' stat files are missing.
{code:java}
e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN  
org.apache.spark.executor.ProcfsMetricsGetter  - There was a problem with 
reading the stat file of the process. java.io.FileNotFoundException: 
/proc/742/stat (No such file or directory) at 
java.io.FileInputStream.open0(Native Method) at 
java.io.FileInputStream.open(FileInputStream.java:195) at 
java.io.FileInputStream.(FileInputStream.java:138) at 
org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205)
 at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at 
org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297)
{code}
The issue can be fixed by updating the flag isAvailable to false when one of 
the child pid's procfs metric is unavailable. Other methods computePid, 
computePageSize, and getChildPids already have this behavior.

Summary: ProcfsMetricsGetter.computeAllMetrics may return partial 
metrics when some of child pids metrics are missing  (was: 
ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when 
some of child pids metrics are missing)

> ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of 
> child pids metrics are missing
> 
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum 
> of a subset of child pids), instead of an all 0 result. This can be 
> misleading and is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
> How to reproduce it?
> This unit test is kind of self-explanatory:
>  
> {code:java}
> val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
> val mockedP = spy(p)
> // proc file of pid 

[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Baohe Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Baohe Zhang updated SPARK-34845:

Description: 
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of 
a subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

How to reproduce it?

This unit test is kind of self-explanatory:
{code:java}
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
val mockedP = spy(p)

// proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
returned
var ptree = Set(26109, 22764, 22763)
when(mockedP.computeProcessTree).thenReturn(ptree)
var r = mockedP.computeAllMetrics
assert(r.jvmVmemTotal == 0)
assert(r.jvmRSSTotal == 0)
assert(r.pythonVmemTotal == 0)
assert(r.pythonRSSTotal == 0)
{code}
In the current implementation, computeAllMetrics will reset the allMetrics to 0 
when processing 22764 because 22764's proc file doesn't exist, but then it will 
continue processing pid 22763, and update allMetrics to procfs metrics of pid 
22763.

Also, a side effect of this bug is that it can lead to a verbose warning log if 
many pids' stat files are missing. An early terminating can make the warning 
logs more concise.

How to solve it?

The issue can be fixed by throwing IOException to computeAllMetrics(), in that 
case computeAllMetrics can aware that at lease one child pid's procfs metrics 
is missing and then terminate the metrics reporting.

  was:
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of 
a subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

How to reproduce it?

This unit test is kind of self-explanatory:

 
{code:java}
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
val mockedP = spy(p)

// proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
returned
var ptree = Set(26109, 22764, 22763)
when(mockedP.computeProcessTree).thenReturn(ptree)
var r = mockedP.computeAllMetrics
assert(r.jvmVmemTotal == 0)
assert(r.jvmRSSTotal == 0)
assert(r.pythonVmemTotal == 0)
assert(r.pythonRSSTotal == 0)
{code}
In the current implementation, computeAllMetrics will reset the allMetrics to 0 
when processing 22764 because 22764's proc file doesn't exist, but then it will 
continue processing pid 22763, and update allMetrics to procfs metrics of pid 
22763.

Also, a side effect of this bug is that it can lead to a verbose warning log if 
many pids' stat files are missing. An early terminating can make the warning 
logs more concise.

How to solve it?

The issue can be fixed by throwing IOException to computeAllMetrics(), in that 
case computeAllMetrics can aware that at lease one child pid's procfs metrics 
is missing and then terminate the metrics reporting.


> ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of 
> child pids metrics are missing
> 
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum 
> of a subset of child pids), instead of an all 0 result. This can be 
> misleading and is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
> How to reproduce it?
> This unit test is kind of self-explanatory:
> {code:java}
> val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
> val mockedP = spy(p)
> // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
> returned
> var ptree = Set(26109, 22764, 22763)
> when(mockedP.computeProcessTree).thenReturn(ptree)
> var r = mockedP.computeAllMetrics
> assert(r.jvmVmemTotal == 0)
> assert(r.jvmRSSTotal == 0)
> assert(r.pythonVmemTotal == 0)
> assert(r.pythonRSSTotal == 0)
> {code}
> In the current implementation, computeAllMetrics will reset the 

[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Baohe Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Baohe Zhang updated SPARK-34845:

Description: 
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of 
a subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

How to reproduce it?

This unit test is kind of self-explanatory:
{code:java}
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
val mockedP = spy(p)

// proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
returned
var ptree = Set(26109, 22764, 22763)
when(mockedP.computeProcessTree).thenReturn(ptree)
var r = mockedP.computeAllMetrics
assert(r.jvmVmemTotal == 0)
assert(r.jvmRSSTotal == 0)
assert(r.pythonVmemTotal == 0)
assert(r.pythonRSSTotal == 0)
{code}
In the current implementation, computeAllMetrics will reset the allMetrics to 0 
when processing 22764 because 22764's proc file doesn't exist, but then it will 
continue processing pid 22763, and update allMetrics to procfs metrics of pid 
22763.

Also, a side effect of this bug is that it can lead to a verbose warning log if 
many pids' stat files are missing. An early terminating can make the warning 
logs more concise.

How to solve it?

The issue can be fixed by throwing IOException to computeAllMetrics(), in that 
case, computeAllMetrics can aware that at lease one child pid's procfs metrics 
is missing and then terminate the metrics reporting.

  was:
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of 
a subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

How to reproduce it?

This unit test is kind of self-explanatory:
{code:java}
val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
val mockedP = spy(p)

// proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
returned
var ptree = Set(26109, 22764, 22763)
when(mockedP.computeProcessTree).thenReturn(ptree)
var r = mockedP.computeAllMetrics
assert(r.jvmVmemTotal == 0)
assert(r.jvmRSSTotal == 0)
assert(r.pythonVmemTotal == 0)
assert(r.pythonRSSTotal == 0)
{code}
In the current implementation, computeAllMetrics will reset the allMetrics to 0 
when processing 22764 because 22764's proc file doesn't exist, but then it will 
continue processing pid 22763, and update allMetrics to procfs metrics of pid 
22763.

Also, a side effect of this bug is that it can lead to a verbose warning log if 
many pids' stat files are missing. An early terminating can make the warning 
logs more concise.

How to solve it?

The issue can be fixed by throwing IOException to computeAllMetrics(), in that 
case computeAllMetrics can aware that at lease one child pid's procfs metrics 
is missing and then terminate the metrics reporting.


> ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of 
> child pids metrics are missing
> 
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum 
> of a subset of child pids), instead of an all 0 result. This can be 
> misleading and is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
> How to reproduce it?
> This unit test is kind of self-explanatory:
> {code:java}
> val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics"))
> val mockedP = spy(p)
> // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be 
> returned
> var ptree = Set(26109, 22764, 22763)
> when(mockedP.computeProcessTree).thenReturn(ptree)
> var r = mockedP.computeAllMetrics
> assert(r.jvmVmemTotal == 0)
> assert(r.jvmRSSTotal == 0)
> assert(r.pythonVmemTotal == 0)
> assert(r.pythonRSSTotal == 0)
> {code}
> In the current implementation, computeAllMetrics will reset the allMetrics 

[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Baohe Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Baohe Zhang updated SPARK-34845:

Description: 
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a 
subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

 

Also, a side effect of it is that it can lead to a verbose warning log if many 
pids' stat files are missing. Also, a side effect of it is that it can lead to 
verbose warning logs if many pids' stat files are missing.
{code:java}
e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN  
org.apache.spark.executor.ProcfsMetricsGetter  - There was a problem with 
reading the stat file of the process. java.io.FileNotFoundException: 
/proc/742/stat (No such file or directory) at 
java.io.FileInputStream.open0(Native Method) at 
java.io.FileInputStream.open(FileInputStream.java:195) at 
java.io.FileInputStream.(FileInputStream.java:138) at 
org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205)
 at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at 
org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297)
{code}
The issue can be fixed by updating the flag isAvailable to false when one of 
the child pid's procfs metric is unavailable. Other methods computePid, 
computePageSize, and getChildPids already have this behavior.

  was:
When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a 
subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

 

Also, a side effect of it is that it can lead to a verbose warning log if many 
pids' stat files are missing. Also, a side effect of it is that it can lead to 
verbose warning logs if many pids' stat files are missing.
{noformat}
e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN  
org.apache.spark.executor.ProcfsMetricsGetter  - There was a problem with 
reading the stat file of the process. java.io.FileNotFoundException: 
/proc/742/stat (No such file or directory) at 
java.io.FileInputStream.open0(Native Method) at 
java.io.FileInputStream.open(FileInputStream.java:195) at 
java.io.FileInputStream.(FileInputStream.java:138) at 
org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205)
 at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at 
org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297){noformat}
The issue can be fixed by updating the flag isAvailable to false when one of 
the child pid's procfs metric is unavailable. Other methods computePid, 
computePageSize, and getChildPids already have this behavior.


> ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when 
> some of child pids metrics are missing
> --
>
> Key: SPARK-34845
> URL: https://issues.apache.org/jira/browse/SPARK-34845
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Baohe Zhang
>Priority: Major
>
> When the procfs metrics of some child pids are unavailable, 
> ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a 
> subset of child pids), instead of an all 0 result. This can be misleading and 
> is undesired per the current code comments in 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].
>  
> Also, a side effect of it is that it can lead to a verbose warning log if 
> many pids' stat files are missing. Also, a side effect of it is that it can 
> lead to verbose warning logs if many pids' stat files are missing.
> {code:java}
> e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN 

[jira] [Created] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when some of child pids metrics are missing

2021-03-23 Thread Baohe Zhang (Jira)
Baohe Zhang created SPARK-34845:
---

 Summary: ProcfsMetricsGetter.computeAllMetrics shouldn't return 
partial metrics when some of child pids metrics are missing
 Key: SPARK-34845
 URL: https://issues.apache.org/jira/browse/SPARK-34845
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.1, 3.1.0, 3.0.2, 3.0.1, 3.0.0
Reporter: Baohe Zhang


When the procfs metrics of some child pids are unavailable, 
ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a 
subset of child pids), instead of an all 0 result. This can be misleading and 
is undesired per the current code comments in 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214].

 

Also, a side effect of it is that it can lead to a verbose warning log if many 
pids' stat files are missing. Also, a side effect of it is that it can lead to 
verbose warning logs if many pids' stat files are missing.
{noformat}
e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN  
org.apache.spark.executor.ProcfsMetricsGetter  - There was a problem with 
reading the stat file of the process. java.io.FileNotFoundException: 
/proc/742/stat (No such file or directory) at 
java.io.FileInputStream.open0(Native Method) at 
java.io.FileInputStream.open(FileInputStream.java:195) at 
java.io.FileInputStream.(FileInputStream.java:138) at 
org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205)
 at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at 
org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205)
 at 
org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297){noformat}
The issue can be fixed by updating the flag isAvailable to false when one of 
the child pid's procfs metric is unavailable. Other methods computePid, 
computePageSize, and getChildPids already have this behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition

2021-03-23 Thread Jason Yarbrough (Jira)
Jason Yarbrough created SPARK-34844:
---

 Summary: JDBCRelation columnPartition function includes the first 
stride in the lower partition
 Key: SPARK-34844
 URL: https://issues.apache.org/jira/browse/SPARK-34844
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Jason Yarbrough


Currently, columnPartition in JDBCRelation contains logic that adds the first 
stride into the lower partition. Because of this, the lower bound isn't used as 
the ceiling for the lower partition.

For example, say we have data 0-10, 10 partitions, and the lowerBound is set to 
1. The lower/first partition should contain anything < 1. However, in the 
current implementation, it would include anything < 2.

A possible easy fix would be changing the following code on line 132:

currentValue += stride

To:

if (i != 0) currentValue += stride

Or include currentValue += stride within the if statement on line 131... 
although this creates a pretty nasty looking side-effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size

2021-03-23 Thread Jason Yarbrough (Jira)
Jason Yarbrough created SPARK-34843:
---

 Summary: JDBCRelation columnPartition function improperly 
determines stride size
 Key: SPARK-34843
 URL: https://issues.apache.org/jira/browse/SPARK-34843
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Jason Yarbrough


Currently, in JDBCRelation (line 123), the stride size is calculated as follows:

val stride: Long = upperBound / numPartitions - lowerBound / numPartitions

 

Due to truncation happening on both divisions, the stride size can fall short 
of what it should be. This can lead to a big difference between the provided 
upper bound and the actual start of the last partition.

I propose this formula, as it is much more accurate and leads to better 
distribution:

val stride: Long = (upperBound - lowerBound) / (numPartitions - 1)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method

2021-03-23 Thread Sergey Kotlov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307398#comment-17307398
 ] 

Sergey Kotlov commented on SPARK-34674:
---

I saw it in the sources. But you are right, generally I should not rely on the 
functionality, that is not described in the official documentation.

> Spark app on k8s doesn't terminate without call to sparkContext.stop() method
> -
>
> Key: SPARK-34674
> URL: https://issues.apache.org/jira/browse/SPARK-34674
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.1.1
>Reporter: Sergey Kotlov
>Priority: Major
>
> Hello!
>  I have run into a problem that if I don't call the method 
> sparkContext.stop() explicitly, then a Spark driver process doesn't terminate 
> even after its Main method has been completed. This behaviour is different 
> from spark on yarn, where the manual sparkContext stopping is not required.
>  It looks like, the problem is in using non-daemon threads, which prevent the 
> driver jvm process from terminating.
>  At least I see two non-daemon threads, if I don't call sparkContext.stop():
> {code:java}
> Thread[OkHttp kubernetes.default.svc,5,main]
> Thread[OkHttp kubernetes.default.svc Writer,5,main]
> {code}
> Could you tell please, if it is possible to solve this problem?
> Docker image from the official release of spark-3.1.1 hadoop3.2 is used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34684) Hadoop config could not be successfully serilized from driver pods to executor pods

2021-03-23 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307392#comment-17307392
 ] 

Attila Zsolt Piros commented on SPARK-34684:



> Have you tried to create a POD from a simple linux image with hadoop client 
> tools and access HDFS from command line?



> Hadoop config could not be successfully serilized from driver pods to 
> executor pods
> ---
>
> Key: SPARK-34684
> URL: https://issues.apache.org/jira/browse/SPARK-34684
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.0.1, 3.0.2
>Reporter: Yue Peng
>Priority: Major
>
> I have set HADOOP_CONF_DIR correctly. And I have verified that hadoop configs 
> have been stored into a configmap and mounted to driver. However, spark pi 
> example job keeps failing where executor do not know how to talk to hdfs. I 
> highly suspect that there is a bug causing it, as I manually create a 
> configmap storing hadoop configs and mounted it to executor in template file, 
> which could fix the error. 
>  
> Spark submit command:
> /opt/spark-3.0/bin/spark-submit --class org.apache.spark.examples.SparkPi 
> --deploy-mode cluster --master k8s://https://10.***.18.96:6443 
> --num-executors 1 --conf spark.kubernetes.namespace=test --conf 
> spark.kubernetes.container.image= --conf 
> spark.kubernetes.driver.podTemplateFile=/opt/spark-3.0/conf/spark-driver.template
>  --conf 
> spark.kubernetes.executor.podTemplateFile=/opt/spark-3.0/conf/spark-executor.template
>   --conf spark.kubernetes.file.upload.path=/opt/spark-3.0/examples/jars 
> hdfs:///tmp/spark-examples_2.12-3.0.125067.jar 1000
>  
>  
> Error log:
>  
> 21/03/10 06:59:58 INFO TransportClientFactory: Successfully created 
> connection to 
> org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078
>  after 608 ms (392 ms spent in bootstraps)
> 21/03/10 06:59:58 INFO SecurityManager: Changing view acls to: root
> 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls to: root
> 21/03/10 06:59:58 INFO SecurityManager: Changing view acls groups to:
> 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls groups to:
> 21/03/10 06:59:58 INFO SecurityManager: SecurityManager: authentication 
> enabled; ui acls disabled; users with view permissions: Set(root); groups 
> with view permissions: Set(); users with modify permissions: Set(root); 
> groups with modify permissions: Set()
> 21/03/10 06:59:59 INFO TransportClientFactory: Successfully created 
> connection to 
> org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078
>  after 130 ms (104 ms spent in bootstraps)
> 21/03/10 06:59:59 INFO DiskBlockManager: Created local directory at 
> /var/data/spark-0f541e3d-994f-4c7a-843f-f7dac57dfc13/blockmgr-981cfb62-5b27-4d1a-8fbd-eddb466faf1d
> 21/03/10 06:59:59 INFO MemoryStore: MemoryStore started with capacity 2047.2 
> MiB
> 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
> spark://coarsegrainedschedu...@org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc:7078
> 21/03/10 06:59:59 INFO ResourceUtils: 
> ==
> 21/03/10 06:59:59 INFO ResourceUtils: Resources for spark.executor:
> 21/03/10 06:59:59 INFO ResourceUtils: 
> ==
> 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Successfully registered 
> with driver
> 21/03/10 06:59:59 INFO Executor: Starting executor ID 1 on host 100.64.0.192
> 21/03/10 07:00:00 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37956.
> 21/03/10 07:00:00 INFO NettyBlockTransferService: Server created on 
> 100.64.0.192:37956
> 21/03/10 07:00:00 INFO BlockManager: Using 
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
> policy
> 21/03/10 07:00:00 INFO BlockManagerMaster: Registering BlockManager 
> BlockManagerId(1, 100.64.0.192, 37956, None)
> 21/03/10 07:00:00 INFO BlockManagerMaster: Registered BlockManager 
> BlockManagerId(1, 100.64.0.192, 37956, None)
> 21/03/10 07:00:00 INFO BlockManager: Initialized BlockManager: 
> BlockManagerId(1, 100.64.0.192, 37956, None)
> 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 0
> 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 1
> 21/03/10 07:00:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 21/03/10 07:00:01 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 21/03/10 07:00:01 INFO Executor: Fetching 
> spark://org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc:7078/jars/spark-examples_2.12-3.0.125067.jar
>  with timestamp 

[jira] [Commented] (SPARK-34684) Hadoop config could not be successfully serilized from driver pods to executor pods

2021-03-23 Thread shanyu zhao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307375#comment-17307375
 ] 

shanyu zhao commented on SPARK-34684:
-

[~attilapiros] What about we want to connect to HDFS HA with something like 
hdfs://mycluster/...? We need to deliver hdfs-site.xml to the driver or 
executor pods, right? Also we'd like to control the storage client's (Hadoop 
file system) behavior with the configuration file.

> Hadoop config could not be successfully serilized from driver pods to 
> executor pods
> ---
>
> Key: SPARK-34684
> URL: https://issues.apache.org/jira/browse/SPARK-34684
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.0.1, 3.0.2
>Reporter: Yue Peng
>Priority: Major
>
> I have set HADOOP_CONF_DIR correctly. And I have verified that hadoop configs 
> have been stored into a configmap and mounted to driver. However, spark pi 
> example job keeps failing where executor do not know how to talk to hdfs. I 
> highly suspect that there is a bug causing it, as I manually create a 
> configmap storing hadoop configs and mounted it to executor in template file, 
> which could fix the error. 
>  
> Spark submit command:
> /opt/spark-3.0/bin/spark-submit --class org.apache.spark.examples.SparkPi 
> --deploy-mode cluster --master k8s://https://10.***.18.96:6443 
> --num-executors 1 --conf spark.kubernetes.namespace=test --conf 
> spark.kubernetes.container.image= --conf 
> spark.kubernetes.driver.podTemplateFile=/opt/spark-3.0/conf/spark-driver.template
>  --conf 
> spark.kubernetes.executor.podTemplateFile=/opt/spark-3.0/conf/spark-executor.template
>   --conf spark.kubernetes.file.upload.path=/opt/spark-3.0/examples/jars 
> hdfs:///tmp/spark-examples_2.12-3.0.125067.jar 1000
>  
>  
> Error log:
>  
> 21/03/10 06:59:58 INFO TransportClientFactory: Successfully created 
> connection to 
> org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078
>  after 608 ms (392 ms spent in bootstraps)
> 21/03/10 06:59:58 INFO SecurityManager: Changing view acls to: root
> 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls to: root
> 21/03/10 06:59:58 INFO SecurityManager: Changing view acls groups to:
> 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls groups to:
> 21/03/10 06:59:58 INFO SecurityManager: SecurityManager: authentication 
> enabled; ui acls disabled; users with view permissions: Set(root); groups 
> with view permissions: Set(); users with modify permissions: Set(root); 
> groups with modify permissions: Set()
> 21/03/10 06:59:59 INFO TransportClientFactory: Successfully created 
> connection to 
> org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078
>  after 130 ms (104 ms spent in bootstraps)
> 21/03/10 06:59:59 INFO DiskBlockManager: Created local directory at 
> /var/data/spark-0f541e3d-994f-4c7a-843f-f7dac57dfc13/blockmgr-981cfb62-5b27-4d1a-8fbd-eddb466faf1d
> 21/03/10 06:59:59 INFO MemoryStore: MemoryStore started with capacity 2047.2 
> MiB
> 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
> spark://coarsegrainedschedu...@org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc:7078
> 21/03/10 06:59:59 INFO ResourceUtils: 
> ==
> 21/03/10 06:59:59 INFO ResourceUtils: Resources for spark.executor:
> 21/03/10 06:59:59 INFO ResourceUtils: 
> ==
> 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Successfully registered 
> with driver
> 21/03/10 06:59:59 INFO Executor: Starting executor ID 1 on host 100.64.0.192
> 21/03/10 07:00:00 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37956.
> 21/03/10 07:00:00 INFO NettyBlockTransferService: Server created on 
> 100.64.0.192:37956
> 21/03/10 07:00:00 INFO BlockManager: Using 
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
> policy
> 21/03/10 07:00:00 INFO BlockManagerMaster: Registering BlockManager 
> BlockManagerId(1, 100.64.0.192, 37956, None)
> 21/03/10 07:00:00 INFO BlockManagerMaster: Registered BlockManager 
> BlockManagerId(1, 100.64.0.192, 37956, None)
> 21/03/10 07:00:00 INFO BlockManager: Initialized BlockManager: 
> BlockManagerId(1, 100.64.0.192, 37956, None)
> 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 0
> 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 1
> 21/03/10 07:00:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 21/03/10 07:00:01 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 21/03/10 07:00:01 INFO Executor: Fetching 
> 

[jira] [Commented] (SPARK-34297) Add metrics for data loss and offset out range for KafkaMicroBatchStream

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307321#comment-17307321
 ] 

Apache Spark commented on SPARK-34297:
--

User 'yijiacui-db' has created a pull request for this issue:
https://github.com/apache/spark/pull/31944

> Add metrics for data loss and offset out range for KafkaMicroBatchStream
> 
>
> Key: SPARK-34297
> URL: https://issues.apache.org/jira/browse/SPARK-34297
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 3.2.0
>Reporter: L. C. Hsieh
>Assignee: L. C. Hsieh
>Priority: Major
>
> When testing SS, I found it is hard to track data loss of SS reading from 
> Kafka. The micro scan node has only one metric, number of output rows. Users 
> have no idea how many times offsets to fetch are out of Kafak now, how many 
> times data loss happens.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33482) V2 Datasources that extend FileScan preclude exchange reuse

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-33482:
--
Fix Version/s: (was: 3.0.3)

> V2 Datasources that extend FileScan preclude exchange reuse
> ---
>
> Key: SPARK-33482
> URL: https://issues.apache.org/jira/browse/SPARK-33482
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Bruce Robbins
>Assignee: Peter Toth
>Priority: Major
> Fix For: 3.2.0, 3.1.2
>
>
> Sample query:
> {noformat}
> spark.read.parquet("tbl").createOrReplaceTempView("tbl")
> spark.read.parquet("lookup").createOrReplaceTempView("lookup")
> sql("""
>select tbl.col1, fk1, fk2
>from tbl, lookup l1, lookup l2
>where fk1 = l1.key
>and fk2 = l2.key
> """).explain
> {noformat}
> Test files can be created as so:
> {noformat}
> import scala.util.Random
> val rand = Random
> val tbl = spark.range(1, 1).map { x =>
>   (rand.nextLong.abs % 20,
>rand.nextLong.abs % 20,
>x)
> }.toDF("fk1", "fk2", "col1")
> tbl.write.mode("overwrite").parquet("tbl")
> val lookup = spark.range(0, 20).map { x =>
>   (x + 1, x * 1, (x + 1) * 1)
> }.toDF("key", "col1", "col2")
> lookup.write.mode("overwrite").parquet("lookup")
> {noformat}
> Output with V1 Parquet reader:
> {noformat}
>  == Physical Plan ==
> *(3) Project [col1#2L, fk1#0L, fk2#1L]
> +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
>:- *(3) Project [fk1#0L, fk2#1L, col1#2L]
>:  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
>: :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
>: :  +- *(3) ColumnarToRow
>: : +- FileScan parquet [fk1#0L,fk2#1L,col1#2L] Batched: true, 
> DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: Parquet, 
> Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], 
> PartitionFilters: [], PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)], 
> ReadSchema: struct
>: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> bigint, false]),false), [id=#75]
>:+- *(1) Filter isnotnull(key#6L)
>:   +- *(1) ColumnarToRow
>:  +- FileScan parquet [key#6L] Batched: true, DataFilters: 
> [isnotnull(key#6L)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], 
> PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
>+- ReusedExchange [key#12L], BroadcastExchange 
> HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
> {noformat}
> With V1 Parquet reader, the exchange for lookup is reused (see last line).
> Output with V2 Parquet reader (spark.sql.sources.useV1SourceList=""):
> {noformat}
>  == Physical Plan ==
> *(3) Project [col1#2L, fk1#0L, fk2#1L]
> +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false
>:- *(3) Project [fk1#0L, fk2#1L, col1#2L]
>:  +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false
>: :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L))
>: :  +- *(3) ColumnarToRow
>: : +- BatchScan[fk1#0L, fk2#1L, col1#2L] ParquetScan DataFilters: 
> [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: parquet, Location: 
> InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], 
> PartitionFilters: [], PushedFilers: [IsNotNull(fk1), IsNotNull(fk2)], 
> ReadSchema: struct, PushedFilters: 
> [IsNotNull(fk1), IsNotNull(fk2)]
>: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> bigint, false]),false), [id=#75]
>:+- *(1) Filter isnotnull(key#6L)
>:   +- *(1) ColumnarToRow
>:  +- BatchScan[key#6L] ParquetScan DataFilters: 
> [isnotnull(key#6L)], Format: parquet, Location: 
> InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], 
> PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: 
> struct, PushedFilters: [IsNotNull(key)]
>+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]),false), [id=#83]
>   +- *(2) Filter isnotnull(key#12L)
>  +- *(2) ColumnarToRow
> +- BatchScan[key#12L] ParquetScan DataFilters: 
> [isnotnull(key#12L)], Format: parquet, Location: 
> InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], 
> PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: 
> struct, PushedFilters: [IsNotNull(key)]
> {noformat}
> With the V2 Parquet reader, the exchange for lookup is not reused (see last 4 
> lines).
> You can see the same issue with the Orc reader (and I assume any other 
> datasource that extends Filescan).
> The issue appears to be this check in 

[jira] [Updated] (SPARK-34756) Fix FileScan equality check

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-34756:
--
Fix Version/s: (was: 3.0.3)

> Fix FileScan equality check
> ---
>
> Key: SPARK-34756
> URL: https://issues.apache.org/jira/browse/SPARK-34756
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
>Reporter: Peter Toth
>Assignee: Peter Toth
>Priority: Major
> Fix For: 3.2.0, 3.1.2
>
>
> `&&` is missing from `FileScan.equals()`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-34842:
--
Reporter: Takeshi Yamamuro  (was: Dongjoon Hyun)

> Corrects the type of date_dim.d_quarter_name in the TPCDS schema
> 
>
> Key: SPARK-34842
> URL: https://issues.apache.org/jira/browse/SPARK-34842
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.1
>Reporter: Takeshi Yamamuro
>Assignee: Takeshi Yamamuro
>Priority: Major
> Fix For: 3.2.0, 3.1.2
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-34842:
-

Assignee: Takeshi Yamamuro

> Corrects the type of date_dim.d_quarter_name in the TPCDS schema
> 
>
> Key: SPARK-34842
> URL: https://issues.apache.org/jira/browse/SPARK-34842
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.1
>Reporter: Dongjoon Hyun
>Assignee: Takeshi Yamamuro
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-34842.
---
Fix Version/s: 3.1.2
   3.2.0
   Resolution: Fixed

Issue resolved by pull request 31943
[https://github.com/apache/spark/pull/31943]

> Corrects the type of date_dim.d_quarter_name in the TPCDS schema
> 
>
> Key: SPARK-34842
> URL: https://issues.apache.org/jira/browse/SPARK-34842
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.1
>Reporter: Dongjoon Hyun
>Assignee: Takeshi Yamamuro
>Priority: Major
> Fix For: 3.2.0, 3.1.2
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307253#comment-17307253
 ] 

Apache Spark commented on SPARK-34842:
--

User 'maropu' has created a pull request for this issue:
https://github.com/apache/spark/pull/31943

> Corrects the type of date_dim.d_quarter_name in the TPCDS schema
> 
>
> Key: SPARK-34842
> URL: https://issues.apache.org/jira/browse/SPARK-34842
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.1
>Reporter: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34842:


Assignee: (was: Apache Spark)

> Corrects the type of date_dim.d_quarter_name in the TPCDS schema
> 
>
> Key: SPARK-34842
> URL: https://issues.apache.org/jira/browse/SPARK-34842
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.1
>Reporter: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307254#comment-17307254
 ] 

Apache Spark commented on SPARK-34842:
--

User 'maropu' has created a pull request for this issue:
https://github.com/apache/spark/pull/31943

> Corrects the type of date_dim.d_quarter_name in the TPCDS schema
> 
>
> Key: SPARK-34842
> URL: https://issues.apache.org/jira/browse/SPARK-34842
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.1
>Reporter: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34842:


Assignee: Apache Spark

> Corrects the type of date_dim.d_quarter_name in the TPCDS schema
> 
>
> Key: SPARK-34842
> URL: https://issues.apache.org/jira/browse/SPARK-34842
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Tests
>Affects Versions: 3.2.0, 3.1.1
>Reporter: Dongjoon Hyun
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema

2021-03-23 Thread Dongjoon Hyun (Jira)
Dongjoon Hyun created SPARK-34842:
-

 Summary: Corrects the type of date_dim.d_quarter_name in the TPCDS 
schema
 Key: SPARK-34842
 URL: https://issues.apache.org/jira/browse/SPARK-34842
 Project: Spark
  Issue Type: Bug
  Components: SQL, Tests
Affects Versions: 3.1.1, 3.2.0
Reporter: Dongjoon Hyun






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34841) Push ANSI interval binary expressions into into (if / case) branches

2021-03-23 Thread Max Gekk (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307236#comment-17307236
 ] 

Max Gekk commented on SPARK-34841:
--

See https://github.com/apache/spark/pull/30955#discussion_r599678286

> Push ANSI interval binary expressions into into (if / case) branches
> 
>
> Key: SPARK-34841
> URL: https://issues.apache.org/jira/browse/SPARK-34841
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Max Gekk
>Priority: Major
>
> Add DateAddYMInterval, TimestampAddYMInterval and TimeAdd to 
> PushFoldableIntoBranches.supportedBinaryExpression + tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34841) Push ANSI interval binary expressions into into (if / case) branches

2021-03-23 Thread Max Gekk (Jira)
Max Gekk created SPARK-34841:


 Summary: Push ANSI interval binary expressions into into (if / 
case) branches
 Key: SPARK-34841
 URL: https://issues.apache.org/jira/browse/SPARK-34841
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: Max Gekk


Add DateAddYMInterval, TimestampAddYMInterval and TimeAdd to 
PushFoldableIntoBranches.supportedBinaryExpression + tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-34824) Multiply year-month interval by numeric

2021-03-23 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk resolved SPARK-34824.
--
Fix Version/s: 3.2.0
   Resolution: Fixed

Issue resolved by pull request 31929
[https://github.com/apache/spark/pull/31929]

> Multiply year-month interval by numeric
> ---
>
> Key: SPARK-34824
> URL: https://issues.apache.org/jira/browse/SPARK-34824
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Max Gekk
>Assignee: Max Gekk
>Priority: Major
> Fix For: 3.2.0
>
>
> Support the multiply op over year-month interval by numeric types including:
> # ByteType
> # ShortType
> # IntegerType
> # LongType
> # FloatType
> # DoubleType
> # DecimalType



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307198#comment-17307198
 ] 

Apache Spark commented on SPARK-34840:
--

User 'otterc' has created a pull request for this issue:
https://github.com/apache/spark/pull/31934

> Fix cases of corruption in merged shuffle blocks that are pushed
> 
>
> Key: SPARK-34840
> URL: https://issues.apache.org/jira/browse/SPARK-34840
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and 
> merges them was introduced in 
> [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 
> scenarios where the merged blocks get corrupted:
>  # {{StreamCallback.onFailure()}} is called more than once. Initially we 
> assumed that the onFailure callback will be called just once per stream. 
> However, we observed that this is called twice when a client connection is 
> reset. When the client connection is reset then there are 2 events that get 
> triggered in this order.
>  * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.exceptionCaught()}} invokes 
> {{callback.onFailure(streamId, cause)}}. This is the first time 
> StreamCallback.onFailure() will be invoked.
>  * {{channelInactive}}. Since the channel closes, the {{channelInactive}} 
> event gets triggered which again is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.channelInactive()}} invokes 
> {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the 
> second time StreamCallback.onFailure() will be invoked.
>  # The flag {{isWriting}} is set prematurely to true. This introduces an edge 
> case where a stream that is trying to merge a duplicate block (created 
> because of a speculative task) may interfere with an active stream if the 
> duplicate stream fails.
> Also adding additional changes that improve the code.
>  # Using positional writes all the time because this simplifies the code and 
> with microbenchmarking haven't seen any performance impact.
>  # Additional minor changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34840:


Assignee: (was: Apache Spark)

> Fix cases of corruption in merged shuffle blocks that are pushed
> 
>
> Key: SPARK-34840
> URL: https://issues.apache.org/jira/browse/SPARK-34840
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and 
> merges them was introduced in 
> [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 
> scenarios where the merged blocks get corrupted:
>  # {{StreamCallback.onFailure()}} is called more than once. Initially we 
> assumed that the onFailure callback will be called just once per stream. 
> However, we observed that this is called twice when a client connection is 
> reset. When the client connection is reset then there are 2 events that get 
> triggered in this order.
>  * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.exceptionCaught()}} invokes 
> {{callback.onFailure(streamId, cause)}}. This is the first time 
> StreamCallback.onFailure() will be invoked.
>  * {{channelInactive}}. Since the channel closes, the {{channelInactive}} 
> event gets triggered which again is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.channelInactive()}} invokes 
> {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the 
> second time StreamCallback.onFailure() will be invoked.
>  # The flag {{isWriting}} is set prematurely to true. This introduces an edge 
> case where a stream that is trying to merge a duplicate block (created 
> because of a speculative task) may interfere with an active stream if the 
> duplicate stream fails.
> Also adding additional changes that improve the code.
>  # Using positional writes all the time because this simplifies the code and 
> with microbenchmarking haven't seen any performance impact.
>  # Additional minor changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307197#comment-17307197
 ] 

Apache Spark commented on SPARK-34840:
--

User 'otterc' has created a pull request for this issue:
https://github.com/apache/spark/pull/31934

> Fix cases of corruption in merged shuffle blocks that are pushed
> 
>
> Key: SPARK-34840
> URL: https://issues.apache.org/jira/browse/SPARK-34840
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and 
> merges them was introduced in 
> [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 
> scenarios where the merged blocks get corrupted:
>  # {{StreamCallback.onFailure()}} is called more than once. Initially we 
> assumed that the onFailure callback will be called just once per stream. 
> However, we observed that this is called twice when a client connection is 
> reset. When the client connection is reset then there are 2 events that get 
> triggered in this order.
>  * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.exceptionCaught()}} invokes 
> {{callback.onFailure(streamId, cause)}}. This is the first time 
> StreamCallback.onFailure() will be invoked.
>  * {{channelInactive}}. Since the channel closes, the {{channelInactive}} 
> event gets triggered which again is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.channelInactive()}} invokes 
> {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the 
> second time StreamCallback.onFailure() will be invoked.
>  # The flag {{isWriting}} is set prematurely to true. This introduces an edge 
> case where a stream that is trying to merge a duplicate block (created 
> because of a speculative task) may interfere with an active stream if the 
> duplicate stream fails.
> Also adding additional changes that improve the code.
>  # Using positional writes all the time because this simplifies the code and 
> with microbenchmarking haven't seen any performance impact.
>  # Additional minor changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34840:


Assignee: Apache Spark

> Fix cases of corruption in merged shuffle blocks that are pushed
> 
>
> Key: SPARK-34840
> URL: https://issues.apache.org/jira/browse/SPARK-34840
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Assignee: Apache Spark
>Priority: Major
>
> The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and 
> merges them was introduced in 
> [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 
> scenarios where the merged blocks get corrupted:
>  # {{StreamCallback.onFailure()}} is called more than once. Initially we 
> assumed that the onFailure callback will be called just once per stream. 
> However, we observed that this is called twice when a client connection is 
> reset. When the client connection is reset then there are 2 events that get 
> triggered in this order.
>  * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.exceptionCaught()}} invokes 
> {{callback.onFailure(streamId, cause)}}. This is the first time 
> StreamCallback.onFailure() will be invoked.
>  * {{channelInactive}}. Since the channel closes, the {{channelInactive}} 
> event gets triggered which again is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.channelInactive()}} invokes 
> {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the 
> second time StreamCallback.onFailure() will be invoked.
>  # The flag {{isWriting}} is set prematurely to true. This introduces an edge 
> case where a stream that is trying to merge a duplicate block (created 
> because of a speculative task) may interfere with an active stream if the 
> duplicate stream fails.
> Also adding additional changes that improve the code.
>  # Using positional writes all the time because this simplifies the code and 
> with microbenchmarking haven't seen any performance impact.
>  # Additional minor changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed

2021-03-23 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-34840:
-

 Summary: Fix cases of corruption in merged shuffle blocks that are 
pushed
 Key: SPARK-34840
 URL: https://issues.apache.org/jira/browse/SPARK-34840
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and 
merges them was introduced in 
[#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 
scenarios where the merged blocks get corrupted:
 # {{StreamCallback.onFailure()}} is called more than once. Initially we 
assumed that the onFailure callback will be called just once per stream. 
However, we observed that this is called twice when a client connection is 
reset. When the client connection is reset then there are 2 events that get 
triggered in this order.

 * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. 
{{StreamInterceptor.exceptionCaught()}} invokes {{callback.onFailure(streamId, 
cause)}}. This is the first time StreamCallback.onFailure() will be invoked.
 * {{channelInactive}}. Since the channel closes, the {{channelInactive}} event 
gets triggered which again is propagated to {{StreamInterceptor}}. 
{{StreamInterceptor.channelInactive()}} invokes {{callback.onFailure(streamId, 
new ClosedChannelException())}}. This is the second time 
StreamCallback.onFailure() will be invoked.

 # The flag {{isWriting}} is set prematurely to true. This introduces an edge 
case where a stream that is trying to merge a duplicate block (created because 
of a speculative task) may interfere with an active stream if the duplicate 
stream fails.

Also adding additional changes that improve the code.
 # Using positional writes all the time because this simplifies the code and 
with microbenchmarking haven't seen any performance impact.
 # Additional minor changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34839) FileNotFoundException on _temporary when multiple app write to same table

2021-03-23 Thread Bimalendu Choudhary (Jira)
Bimalendu Choudhary created SPARK-34839:
---

 Summary: FileNotFoundException on _temporary when multiple app 
write to same table
 Key: SPARK-34839
 URL: https://issues.apache.org/jira/browse/SPARK-34839
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
 Environment: CDH 6.2.1 Hadoop 3.0.0 
Reporter: Bimalendu Choudhary


When multiple Spark applications are writing to the same hive table ( but 
different partitions so not interfering with each other in any way), the 
application finishing first ends up deleting the parent _temporary directory 
which is still being used by other application.

I think the temporary directory being used by FileOutputCommitter should be 
made configurable to let the caller call with with its own unique value as per 
the requirement and without having to worry about some other application 
deleting it unknowingly. Something like:
{quote}
 public static final String PENDING_DIR_NAME_DEFAULT = "_temporary";
 public static final String PENDING_DIR_NAME_DEFAULT =
 "mapreduce.fileoutputcommitter.tempdir";
{quote}
 

We can not use mapreduce.fileoutputcommitter.algorithm.version = 2 due to its 
issue with data loss https://issues.apache.org/jira/browse/MAPREDUCE-7282. 

There are similar Jira https://issues.apache.org/jira/browse/SPARK-18883, whihc 
was not resolved. This is very generic case of simply one spark application 
messing up working of other spark application working on same table and can be 
avoided by making temp unique or configurable.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34838) consider markdownlint

2021-03-23 Thread Josh Soref (Jira)
Josh Soref created SPARK-34838:
--

 Summary: consider markdownlint
 Key: SPARK-34838
 URL: https://issues.apache.org/jira/browse/SPARK-34838
 Project: Spark
  Issue Type: Bug
  Components: Build, Documentation
Affects Versions: 3.1.1
Reporter: Josh Soref


There's apparently a tool called markdownlint which seems to be integrated w/ 
Visual Studio Code. It suggests, among other things, to use {{*}} instead of 
{{-}} for top level bullets:

https://github.com/DavidAnson/markdownlint/blob/v0.23.1/doc/Rules.md#md004 
https://github.com/apache/spark/pull/30679#issuecomment-804971370



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34837) Support ANSI SQL intervals by the aggregate function `avg`

2021-03-23 Thread Max Gekk (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Gekk updated SPARK-34837:
-
Description: Extend 
org.apache.spark.sql.catalyst.expressions.aggregate.Average to support 
DayTimeIntervalType and YearMonthIntervalType.  (was: Extend 
org.apache.spark.sql.catalyst.expressions.aggregate.Sum to support 
DayTimeIntervalType and YearMonthIntervalType.)

> Support ANSI SQL intervals by the aggregate function `avg`
> --
>
> Key: SPARK-34837
> URL: https://issues.apache.org/jira/browse/SPARK-34837
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Max Gekk
>Priority: Major
>
> Extend org.apache.spark.sql.catalyst.expressions.aggregate.Average to support 
> DayTimeIntervalType and YearMonthIntervalType.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34837) Support ANSI SQL intervals by the aggregate function `avg`

2021-03-23 Thread Max Gekk (Jira)
Max Gekk created SPARK-34837:


 Summary: Support ANSI SQL intervals by the aggregate function `avg`
 Key: SPARK-34837
 URL: https://issues.apache.org/jira/browse/SPARK-34837
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: Max Gekk


Extend org.apache.spark.sql.catalyst.expressions.aggregate.Sum to support 
DayTimeIntervalType and YearMonthIntervalType.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34719) fail if the view query has duplicated column names

2021-03-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-34719:

Fix Version/s: 3.2.0
   2.4.8

> fail if the view query has duplicated column names
> --
>
> Key: SPARK-34719
> URL: https://issues.apache.org/jira/browse/SPARK-34719
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.7, 3.0.0, 3.1.0, 3.1.1
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.8, 3.2.0, 3.1.2, 3.0.3
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-34421) Custom functions can't be used in temporary views with CTEs

2021-03-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-34421.
-
Resolution: Fixed

> Custom functions can't be used in temporary views with CTEs
> ---
>
> Key: SPARK-34421
> URL: https://issues.apache.org/jira/browse/SPARK-34421
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.0
> Environment: Databricks Runtime 8.0
>Reporter: Lauri Koobas
>Assignee: Peter Toth
>Priority: Blocker
> Fix For: 3.0.3, 3.1.1
>
>
> The following query works in Spark 3.0 not Spark 3.1.
>   
>  Start with:
>  {{spark.udf.registerJavaFunction("custom_func", 
> "com.stuff.path.custom_func", LongType())}}
>   
>  Works: * {{select custom_func()}}
>  * {{create temporary view blaah as select custom_func()}}
>  * {{with step_1 as ( select custom_func() ) select * from step_1}}
> Broken:
>  {{create temporary view blaah as with step_1 as ( select custom_func() ) 
> select * from step_1}}
>   
>  followed by:
>  {{select * from blaah}}
>   
>  Error:
>  {{Error in SQL statement: AnalysisException: No handler for UDF/UDAF/UDTF 
> '}}{{com.stuff.path.custom_func}}{{';}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34421) Custom functions can't be used in temporary views with CTEs

2021-03-23 Thread Wenchen Fan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307101#comment-17307101
 ] 

Wenchen Fan commented on SPARK-34421:
-

[~laurikoobas] It's an internal mistake that this fix doesn't go to DBR 8.1, 
and it's already fixed 8 hours ago. Apache Spark has no problem at all.

> Custom functions can't be used in temporary views with CTEs
> ---
>
> Key: SPARK-34421
> URL: https://issues.apache.org/jira/browse/SPARK-34421
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.0
> Environment: Databricks Runtime 8.0
>Reporter: Lauri Koobas
>Assignee: Peter Toth
>Priority: Blocker
> Fix For: 3.1.1, 3.0.3
>
>
> The following query works in Spark 3.0 not Spark 3.1.
>   
>  Start with:
>  {{spark.udf.registerJavaFunction("custom_func", 
> "com.stuff.path.custom_func", LongType())}}
>   
>  Works: * {{select custom_func()}}
>  * {{create temporary view blaah as select custom_func()}}
>  * {{with step_1 as ( select custom_func() ) select * from step_1}}
> Broken:
>  {{create temporary view blaah as with step_1 as ( select custom_func() ) 
> select * from step_1}}
>   
>  followed by:
>  {{select * from blaah}}
>   
>  Error:
>  {{Error in SQL statement: AnalysisException: No handler for UDF/UDAF/UDTF 
> '}}{{com.stuff.path.custom_func}}{{';}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34836) DataSourceV2Relation with column filter fails with ClassCastException at collectAsList

2021-03-23 Thread Markus Riedl-Ehrenleitner (Jira)
Markus Riedl-Ehrenleitner created SPARK-34836:
-

 Summary: DataSourceV2Relation with column filter fails with 
ClassCastException at collectAsList
 Key: SPARK-34836
 URL: https://issues.apache.org/jira/browse/SPARK-34836
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.1
Reporter: Markus Riedl-Ehrenleitner


After trying to upgrade to 3.1.1. multiple of our test cases fail with a 
ClassCastException at *DataFrame.collectAsList()*

Produced exception:
{noformat}
java.lang.ClassCastException: class 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
class org.apache.spark.sql.catalyst.expressions.UnsafeRow 
(org.apache.spark.sql.catalyst.expressions.GenericInternalRow and 
org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module of 
loader 'app')
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
 ~[spark-sql_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
 at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.scheduler.Task.run(Task.scala:131) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
 at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 ~[spark-core_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 
~[spark-core_2.12-3.1.1.jar:3.1.1]
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 
[spark-core_2.12-3.1.1.jar:3.1.1]
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
 at java.lang.Thread.run(Thread.java:834) [?:?]
2021-03-23 13:00:26.974 WARN [org.apache.spark.scheduler.TaskSetManager] Lost 
task 0.0 in stage 0.0 (TID 0) (192.168.0.6 executor driver): 
java.lang.ClassCastException: class 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to 
class org.apache.spark.sql.catalyst.expressions.UnsafeRow 
(org.apache.spark.sql.catalyst.expressions.GenericInternalRow and 
org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module of 
loader 'app')
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
 at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:131)
 at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834){noformat}
 

All test cases follow this pattern:
{code:java}
StructField aDouble = DataTypes.createStructField("aDouble", 
DataTypes.DoubleType, true);
StructField aDoubleArray = DataTypes.createStructField("aDoubleArray", 
DataTypes.createArrayType(DataTypes.DoubleType), true);

StructType schema = DataTypes.createStructType(Arrays.asList(aDouble, 
aDoubleArray));

Column aDoubleFilter = functions.column("aDouble").equalTo(functions.lit(1d));
Column aDoubleArrayFilter = functions.aggregate(
 functions.col("aDoubleArray"),
 functions.lit(false),
 (seed, column) -> seed.or(column.equalTo(functions.lit(1d;
Column filter = aDoubleFilter.or(aDoubleArrayFilter);

CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(new 
HashMap<>());
Dataset dataset = Dataset.ofRows(sparkSession, 
DataSourceV2Relation.create(new ReadTable(schema), null, null, options));

Dataset filtered = dataset.filter(filter);

List collected = filtered.collectAsList();
{code}
 

ReadTable is a *org.apache.spark.sql.connector.catalog.Table*, with the given 
schema 

[jira] [Created] (SPARK-34835) Support TimestampType in UDT

2021-03-23 Thread Darcy Shen (Jira)
Darcy Shen created SPARK-34835:
--

 Summary: Support TimestampType in UDT
 Key: SPARK-34835
 URL: https://issues.apache.org/jira/browse/SPARK-34835
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.1.1, 3.0.2
Reporter: Darcy Shen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34830) Some UDF calls inside transform are broken

2021-03-23 Thread Pavel Chernikov (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307055#comment-17307055
 ] 

Pavel Chernikov commented on SPARK-34830:
-

Good catch! I tend to agree now, that the issues are related.

> Some UDF calls inside transform are broken
> --
>
> Key: SPARK-34830
> URL: https://issues.apache.org/jira/browse/SPARK-34830
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Daniel Solow
>Priority: Major
>
> Let's say I want to create a UDF to do a simple lookup on a string:
> {code:java}
> import org.apache.spark.sql.{functions => f}
> val M = Map("a" -> "abc", "b" -> "defg")
> val BM = spark.sparkContext.broadcast(M)
> val LOOKUP = f.udf((s: String) => BM.value.get(s))
> {code}
> Now if I have the following dataframe:
> {code:java}
> val df = Seq(
> Tuple1(Seq("a", "b"))
> ).toDF("arr")
> {code}
> and I want to run this UDF over each element in the array, I can do:
> {code:java}
> df.select(f.transform($"arr", i => LOOKUP(i)).as("arr")).show(false)
> {code}
> This should show:
> {code:java}
> +---+
> |arr|
> +---+
> |[abc, defg]|
> +---+
> {code}
> However it actually shows:
> {code:java}
> +---+
> |arr|
> +---+
> |[def, defg]|
> +---+
> {code}
> It's also broken for SQL (even without DSL). This gives the same result:
> {code:java}
> spark.udf.register("LOOKUP",(s: String) => BM.value.get(s))
> df.selectExpr("TRANSFORM(arr, a -> LOOKUP(a)) AS arr").show(false)
> {code}
> Note that "def" is not even in the map I'm using.
> This is a big problem because it breaks existing code/UDFs. I noticed this 
> because the job I ported from 2.4.5 to 3.1.1 seemed to be working, but was 
> actually producing broken data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-34366) Add metric interfaces to DS v2

2021-03-23 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-34366.
-
Fix Version/s: 3.2.0
   Resolution: Fixed

Issue resolved by pull request 31476
[https://github.com/apache/spark/pull/31476]

> Add metric interfaces to DS v2
> --
>
> Key: SPARK-34366
> URL: https://issues.apache.org/jira/browse/SPARK-34366
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: L. C. Hsieh
>Assignee: L. C. Hsieh
>Priority: Major
> Fix For: 3.2.0
>
>
> Add a few public API change to DS v2, to make DS v2 scan can report metrics 
> to Spark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34829) transform_values return identical values when it's used with udf that returns reference type

2021-03-23 Thread Pavel Chernikov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Chernikov updated SPARK-34829:

Summary: transform_values return identical values when it's used with udf 
that returns reference type  (was: transform_values return identical values 
while operating on complex types)

> transform_values return identical values when it's used with udf that returns 
> reference type
> 
>
> Key: SPARK-34829
> URL: https://issues.apache.org/jira/browse/SPARK-34829
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Pavel Chernikov
>Priority: Major
>
> If return value of an {{udf}} that is passed to {{transform_values}} is an 
> {{AnyRef}}, then the transformation returns identical new values for each map 
> key (to be more precise, each newly obtained value overrides values for all 
> previously processed keys).
> Consider following examples:
> {code:java}
> case class Bar(i: Int)
> val square = udf((b: Bar) => b.i * b.i)
> val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
> df.withColumn("map_square", transform_values(col("map"), (_, v) => 
> square(v))).show(truncate = false)
> +--++
> |map                           |map_square              |
> +--++
> |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}|
> +--++
> {code}
> vs 
> {code:java}
> case class Bar(i: Int)
> case class BarSquare(i: Int)
> val square = udf((b: Bar) => BarSquare(b.i * b.i))
> val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
> df.withColumn("map_square", transform_values(col("map"), (_, v) => 
> square(v))).show(truncate = false)
> +--+--+
> |map                           |map_square                    |
> +--+--+
> |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}|
> +--+--+
> {code}
> or even just this one
> {code:java}
> case class Foo(s: String)
> val reverse = udf((f: Foo) => f.s.reverse)
> val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> 
> Foo("xyz"))).toDF("map")
> df.withColumn("map_reverse", transform_values(col("map"), (_, v) => 
> reverse(v))).show(truncate = false)
> ++--+
> |map |map_reverse   |
> ++--+
> |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}|
> ++--+
> {code}
> After playing with 
> {{org.apache.spark.sql.catalyst.expressions.TransformValues}} it looks like 
> something wrong is happening while executing this line:
> {code:java}
> resultValues.update(i, functionForEval.eval(inputRow)){code}
> To be more precise , it's all about {{functionForEval.eval(inputRow)}} , 
> because if you do something like this:
> {code:java}
> println(s"RESULTS PRIOR TO EVALUATION - $resultValues")
> val resultValue = functionForEval.eval(inputRow)
> println(s"RESULT - $resultValue")
> println(s"RESULTS PRIOR TO UPDATE - $resultValues")
> resultValues.update(i, resultValue)
> println(s"RESULTS AFTER UPDATE - $resultValues"){code}
> You'll see in the logs, something like:
> {code:java}
> RESULTS PRIOR TO EVALUATION - [null,null,null] 
> RESULT - [0,1] 
> RESULTS PRIOR TO UPDATE - [null,null,null]
> RESULTS AFTER UPDATE - [[0,1],null,null]
> --
> RESULTS PRIOR TO EVALUATION - [[0,1],null,null] 
> RESULT - [0,4]
> RESULTS PRIOR TO UPDATE - [[0,4],null,null] 
> RESULTS  AFTER UPDATE - [[0,4],[0,4],null]
> --
> RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] 
> RESULT - [0,9]
> RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null]
> RESULTS  AFTER UPDATE - [[0,9],[0,9],[0,9]
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34829) transform_values return identical values while operating on complex types

2021-03-23 Thread Pavel Chernikov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Chernikov updated SPARK-34829:

Description: 
If return value of an {{udf}} that is passed to {{transform_values}} is an 
{{AnyRef}}, then the transformation returns identical new values for each map 
key (to be more precise, each newly obtained value overrides values for all 
previously processed keys).

Consider following examples:
{code:java}
case class Bar(i: Int)
val square = udf((b: Bar) => b.i * b.i)
val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
df.withColumn("map_square", transform_values(col("map"), (_, v) => 
square(v))).show(truncate = false)
+--++
|map                           |map_square              |
+--++
|{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}|
+--++
{code}
vs 
{code:java}
case class Bar(i: Int)
case class BarSquare(i: Int)
val square = udf((b: Bar) => BarSquare(b.i * b.i))
val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
df.withColumn("map_square", transform_values(col("map"), (_, v) => 
square(v))).show(truncate = false)
+--+--+
|map                           |map_square                    |
+--+--+
|{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}|
+--+--+
{code}
or even just this one
{code:java}
case class Foo(s: String)
val reverse = udf((f: Foo) => f.s.reverse)
val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> Foo("xyz"))).toDF("map")
df.withColumn("map_reverse", transform_values(col("map"), (_, v) => 
reverse(v))).show(truncate = false)
++--+
|map |map_reverse   |
++--+
|{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}|
++--+
{code}
After playing with 
{{org.apache.spark.sql.catalyst.expressions.TransformValues}} it looks like 
something wrong is happening while executing this line:
{code:java}
resultValues.update(i, functionForEval.eval(inputRow)){code}
To be more precise , it's all about {{functionForEval.eval(inputRow)}} , 
because if you do something like this:
{code:java}
println(s"RESULTS PRIOR TO EVALUATION - $resultValues")
val resultValue = functionForEval.eval(inputRow)
println(s"RESULT - $resultValue")
println(s"RESULTS PRIOR TO UPDATE - $resultValues")
resultValues.update(i, resultValue)
println(s"RESULTS AFTER UPDATE - $resultValues"){code}
You'll see in the logs, something like:
{code:java}
RESULTS PRIOR TO EVALUATION - [null,null,null] 
RESULT - [0,1] 
RESULTS PRIOR TO UPDATE - [null,null,null]
RESULTS AFTER UPDATE - [[0,1],null,null]
--
RESULTS PRIOR TO EVALUATION - [[0,1],null,null] 
RESULT - [0,4]
RESULTS PRIOR TO UPDATE - [[0,4],null,null] 
RESULTS  AFTER UPDATE - [[0,4],[0,4],null]
--
RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] 
RESULT - [0,9]
RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null]
RESULTS  AFTER UPDATE - [[0,9],[0,9],[0,9]
{code}
 

  was:
If map values are {{StructType}} s then behavior of {{transform_values}} is 
inconsistent (it may return identical values). To be more precise, it looks 
like it returns identical values if the return type is {{AnyRef}}.

Consider following examples:
{code:java}
case class Bar(i: Int)
val square = udf((b: Bar) => b.i * b.i)
val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
df.withColumn("map_square", transform_values(col("map"), (_, v) => 
square(v))).show(truncate = false)
+--++
|map                           |map_square              |
+--++
|{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}|
+--++
{code}
vs 
{code:java}
case class Bar(i: Int)
case class BarSquare(i: Int)
val square = udf((b: Bar) => BarSquare(b.i * b.i))
val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
df.withColumn("map_square", transform_values(col("map"), (_, v) => 
square(v))).show(truncate = false)
+--+--+
|map                           |map_square                    |
+--+--+
|{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}|
+--+--+
{code}
or even just this one
{code:java}
case class Foo(s: String)
val reverse = udf((f: Foo) => 

[jira] [Commented] (SPARK-34083) Using TPCDS original definitions for char/varchar columns

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307024#comment-17307024
 ] 

Apache Spark commented on SPARK-34083:
--

User 'maropu' has created a pull request for this issue:
https://github.com/apache/spark/pull/31943

> Using TPCDS original definitions for char/varchar columns
> -
>
> Key: SPARK-34083
> URL: https://issues.apache.org/jira/browse/SPARK-34083
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0, 3.2.0
>Reporter: Kent Yao
>Assignee: Kent Yao
>Priority: Major
> Fix For: 3.1.1
>
>
> Using TPCDS original definitions for char/varchar columns instead of the 
> modified string



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34833) Apply right-padding correctly for correlated subqueries

2021-03-23 Thread Takeshi Yamamuro (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307000#comment-17307000
 ] 

Takeshi Yamamuro commented on SPARK-34833:
--

Ah, thanks, Dongjoon. I forgot to check the previous versions.

> Apply right-padding correctly for correlated subqueries
> ---
>
> Key: SPARK-34833
> URL: https://issues.apache.org/jira/browse/SPARK-34833
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.0, 3.2.0, 3.1.1
>Reporter: Takeshi Yamamuro
>Priority: Blocker
>  Labels: correctness
>
> This ticket aim at  fixing the bug that does not apply right-padding for char 
> types inside correlated subquries.
> For example,  a query below returns nothing in master, but a correct result 
> is `c`.
> {code}
> scala> sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING parquet")
> scala> sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(7)) USING parquet")
> scala> sql("INSERT INTO t1 VALUES ('c', 'b')")
> scala> sql("INSERT INTO t2 VALUES ('a', 'b')")
> scala> val df = sql("""
>   |SELECT v FROM t1
>   |WHERE 'a' IN (SELECT v FROM t2 WHERE t2.c = t1.c )""".stripMargin)
> scala> df.show()
> +---+
> |  v|
> +---+
> +---+
> {code}
> This is because `ApplyCharTypePadding`  does not handle the case above to 
> apply right-padding into `'abc'`. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34834) There is a potential Netty memory leak in TransportResponseHandler.

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34834:


Assignee: Apache Spark

> There is a potential Netty memory leak in TransportResponseHandler.
> ---
>
> Key: SPARK-34834
> URL: https://issues.apache.org/jira/browse/SPARK-34834
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.7, 3.0.2, 3.1.0, 3.1.1
>Reporter: weixiuli
>Assignee: Apache Spark
>Priority: Major
>
> There is a potential Netty memory leak in TransportResponseHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-34834) There is a potential Netty memory leak in TransportResponseHandler.

2021-03-23 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-34834:


Assignee: (was: Apache Spark)

> There is a potential Netty memory leak in TransportResponseHandler.
> ---
>
> Key: SPARK-34834
> URL: https://issues.apache.org/jira/browse/SPARK-34834
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.7, 3.0.2, 3.1.0, 3.1.1
>Reporter: weixiuli
>Priority: Major
>
> There is a potential Netty memory leak in TransportResponseHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34834) There is a potential Netty memory leak in TransportResponseHandler.

2021-03-23 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17306989#comment-17306989
 ] 

Apache Spark commented on SPARK-34834:
--

User 'weixiuli' has created a pull request for this issue:
https://github.com/apache/spark/pull/31942

> There is a potential Netty memory leak in TransportResponseHandler.
> ---
>
> Key: SPARK-34834
> URL: https://issues.apache.org/jira/browse/SPARK-34834
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.4.7, 3.0.2, 3.1.0, 3.1.1
>Reporter: weixiuli
>Priority: Major
>
> There is a potential Netty memory leak in TransportResponseHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >