[jira] [Resolved] (SPARK-34763) col(), $"" and df("name") should handle quoted column names properly.
[ https://issues.apache.org/jira/browse/SPARK-34763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-34763. - Fix Version/s: 3.0.3 3.1.2 3.2.0 Resolution: Fixed > col(), $"" and df("name") should handle quoted column names properly. > --- > > Key: SPARK-34763 > URL: https://issues.apache.org/jira/browse/SPARK-34763 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.7, 3.0.2, 3.2.0, 3.1.1 >Reporter: Kousuke Saruta >Assignee: Kousuke Saruta >Priority: Major > Fix For: 3.2.0, 3.1.2, 3.0.3 > > > Quoted column names like `a``b.c` cannot be represented with col(), $"" > and df("") because they don't handle such column names properly. > For example, if we have a following DataFrame. > {code} > val df1 = spark.sql("SELECT 'col1' AS `a``b.c`") > {code} > For the DataFrame, this query is successfully executed. > {code} > scala> df1.selectExpr("`a``b.c`").show > +-+ > |a`b.c| > +-+ > | col1| > +-+ > {code} > But the following query will fail because df1("`a``b.c`") throws an exception. > {code} > scala> df1.select(df1("`a``b.c`")).show > org.apache.spark.sql.AnalysisException: syntax error in attribute name: > `a``b.c`; > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.e$1(unresolved.scala:152) > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$.parseAttributeName(unresolved.scala:162) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:121) > at org.apache.spark.sql.Dataset.resolve(Dataset.scala:221) > at org.apache.spark.sql.Dataset.col(Dataset.scala:1274) > at org.apache.spark.sql.Dataset.apply(Dataset.scala:1241) > ... 49 elided > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33720) Support submit to k8s only with token
[ https://issues.apache.org/jira/browse/SPARK-33720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-33720: -- Affects Version/s: (was: 3.0.2) 3.2.0 > Support submit to k8s only with token > - > > Key: SPARK-33720 > URL: https://issues.apache.org/jira/browse/SPARK-33720 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.2.0 >Reporter: hong dongdong >Assignee: hong dongdong >Priority: Major > Fix For: 3.2.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-33720) Support submit to k8s only with token
[ https://issues.apache.org/jira/browse/SPARK-33720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-33720: - Assignee: hong dongdong > Support submit to k8s only with token > - > > Key: SPARK-33720 > URL: https://issues.apache.org/jira/browse/SPARK-33720 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.2 >Reporter: hong dongdong >Assignee: hong dongdong >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-33720) Support submit to k8s only with token
[ https://issues.apache.org/jira/browse/SPARK-33720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-33720. --- Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by pull request 30684 [https://github.com/apache/spark/pull/30684] > Support submit to k8s only with token > - > > Key: SPARK-33720 > URL: https://issues.apache.org/jira/browse/SPARK-33720 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.2 >Reporter: hong dongdong >Assignee: hong dongdong >Priority: Major > Fix For: 3.2.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34847) Simplify ResolveAggregateFunctions
[ https://issues.apache.org/jira/browse/SPARK-34847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34847: Assignee: (was: Apache Spark) > Simplify ResolveAggregateFunctions > -- > > Key: SPARK-34847 > URL: https://issues.apache.org/jira/browse/SPARK-34847 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34847) Simplify ResolveAggregateFunctions
[ https://issues.apache.org/jira/browse/SPARK-34847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307571#comment-17307571 ] Apache Spark commented on SPARK-34847: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/31947 > Simplify ResolveAggregateFunctions > -- > > Key: SPARK-34847 > URL: https://issues.apache.org/jira/browse/SPARK-34847 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34847) Simplify ResolveAggregateFunctions
[ https://issues.apache.org/jira/browse/SPARK-34847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34847: Assignee: Apache Spark > Simplify ResolveAggregateFunctions > -- > > Key: SPARK-34847 > URL: https://issues.apache.org/jira/browse/SPARK-34847 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34847) Simplify ResolveAggregateFunctions
Wenchen Fan created SPARK-34847: --- Summary: Simplify ResolveAggregateFunctions Key: SPARK-34847 URL: https://issues.apache.org/jira/browse/SPARK-34847 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Wenchen Fan -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Shen updated SPARK-34835: --- Description: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. *From pandas to pyarrow* {code:java} if isinstance(dt, UserDefinedType): s = s.apply(dt.serialize) t = to_arrow_type(dt) array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) {code} The above code may not work. If UDT is like: {code:java} class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts {code} *From pyarrow to Spark Internal* Serialize and deserialize will fail. See the failed PR demo: https://github.com/eddyxu/spark/pull/4 was: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. *From pandas to pyarrow* {code:java} if isinstance(dt, UserDefinedType): s = s.apply(dt.serialize) t = to_arrow_type(dt) array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) {code} The above code may not work. If UDT is like: {code:java} class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts {code} *From pyarrow to Spark Internal* Serialize and deserialize will fail. https://github.com/eddyxu/spark/pull/4 > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > > For user defined type with TimestampType, conversion from pandas to pyarrow > and from pyarrow to Spark UnsafeRow needs to be carefully handled. > *From pandas to pyarrow* > {code:java} > if isinstance(dt, UserDefinedType): > s = s.apply(dt.serialize) > t = to_arrow_type(dt) > array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) > {code} > The above code may not work. If UDT is like: > {code:java} > class ExamplePointWithTimeUDT(UserDefinedType): > """ > User-defined type (UDT) for ExamplePointWithTime. > """ > @classmethod > def sqlType(self): > return StructType([ > StructField("x", DoubleType(), False), >
[jira] [Updated] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Shen updated SPARK-34835: --- Description: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. *From pandas to pyarrow* {code:java} if isinstance(dt, UserDefinedType): s = s.apply(dt.serialize) t = to_arrow_type(dt) array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) {code} The above code may not work. If UDT is like: {code:java} class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts {code} *From pyarrow to Spark Internal* Serialize and deserialize will fail. https://github.com/eddyxu/spark/pull/4 was: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. *From pandas to pyarrow* {code:java} if isinstance(dt, UserDefinedType): s = s.apply(dt.serialize) t = to_arrow_type(dt) array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) {code} The above code may not work. If UDT is like: {code:java} class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts {code} *From pyarrow to Spark Internal* Serialize and deserialize will fail. > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > > For user defined type with TimestampType, conversion from pandas to pyarrow > and from pyarrow to Spark UnsafeRow needs to be carefully handled. > *From pandas to pyarrow* > {code:java} > if isinstance(dt, UserDefinedType): > s = s.apply(dt.serialize) > t = to_arrow_type(dt) > array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) > {code} > The above code may not work. If UDT is like: > {code:java} > class ExamplePointWithTimeUDT(UserDefinedType): > """ > User-defined type (UDT) for ExamplePointWithTime. > """ > @classmethod > def sqlType(self): > return StructType([ > StructField("x", DoubleType(), False), > StructField("y", DoubleType(), True), >
[jira] [Updated] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Shen updated SPARK-34835: --- Description: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. *From pandas to pyarrow* {code:java} if isinstance(dt, UserDefinedType): s = s.apply(dt.serialize) t = to_arrow_type(dt) array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) {code} The above code may not work. If UDT is like: {code:java} class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts {code} *From pyarrow to Spark Internal* Serialize and deserialize will fail. was: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. >From pandas to pyarrow {code:java} if isinstance(dt, UserDefinedType): s = s.apply(dt.serialize) t = to_arrow_type(dt) array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) {code} The above code may not work. If UDT is like: {code:java} class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts {code} >From pyarrow to Spark Internal Serialize and deserialize will fail. > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > > For user defined type with TimestampType, conversion from pandas to pyarrow > and from pyarrow to Spark UnsafeRow needs to be carefully handled. > *From pandas to pyarrow* > {code:java} > if isinstance(dt, UserDefinedType): > s = s.apply(dt.serialize) > t = to_arrow_type(dt) > array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) > {code} > The above code may not work. If UDT is like: > {code:java} > class ExamplePointWithTimeUDT(UserDefinedType): > """ > User-defined type (UDT) for ExamplePointWithTime. > """ > @classmethod > def sqlType(self): > return StructType([ > StructField("x", DoubleType(), False), > StructField("y", DoubleType(), True), > StructField("ts", TimestampType(), False),
[jira] [Updated] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Shen updated SPARK-34835: --- Description: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. >From pandas to pyarrow {code:java} if isinstance(dt, UserDefinedType): s = s.apply(dt.serialize) t = to_arrow_type(dt) array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) {code} The above code may not work. If UDT is like: {code:java} class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts {code} >From pyarrow to Spark Internal Serialize and deserialize will fail. was: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. ## From pandas to pyarrow {code:java} pa.StructArray.from_pandas {code} ## From pyarrow to Spark Internal Serialize and deserialize will fail. > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > > For user defined type with TimestampType, conversion from pandas to pyarrow > and from pyarrow to Spark UnsafeRow needs to be carefully handled. > From pandas to pyarrow > {code:java} > if isinstance(dt, UserDefinedType): > s = s.apply(dt.serialize) > t = to_arrow_type(dt) > array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck) > {code} > The above code may not work. If UDT is like: > {code:java} > class ExamplePointWithTimeUDT(UserDefinedType): > """ > User-defined type (UDT) for ExamplePointWithTime. > """ > @classmethod > def sqlType(self): > return StructType([ > StructField("x", DoubleType(), False), > StructField("y", DoubleType(), True), > StructField("ts", TimestampType(), False), > ]) > @classmethod > def module(cls): > return 'pyspark.sql.tests' > @classmethod > def scalaUDT(cls): > return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' > def serialize(self, obj): > return [obj.x, obj.y, obj.ts] > def deserialize(self, datum): > return ExamplePointWithTime(datum[0], datum[1], datum[2]) > class ExamplePointWithTime: > """ > An example class to demonstrate UDT in Scala, Java, and Python. > """ > __UDT__ = ExamplePointWithTimeUDT() > def __init__(self, x, y, ts): > self.x = x > self.y = y > self.ts = ts > def __repr__(self): > return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) > def __str__(self): > return "(%s,%s,%s)" % (self.x, self.y, self.ts) > def __eq__(self, other): > return isinstance(other, self.__class__) \ > and other.x == self.x and other.y == self.y \ > and other.ts == self.ts > {code} > From pyarrow to Spark Internal > Serialize and deserialize will fail. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Shen updated SPARK-34835: --- Description: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. ## From pandas to pyarrow {code:java} pa.StructArray.from_pandas {code} ## From pyarrow to Spark Internal Serialize and deserialize will fail. was: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. ## From pandas to pyarrow {code:java} pa.StructArray.from_pandas {code} ## From pyarrow to Spark Internal Serialize and deserialize will fail. > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > > For user defined type with TimestampType, conversion from pandas to pyarrow > and from pyarrow to Spark UnsafeRow needs to be carefully handled. > ## From pandas to pyarrow > {code:java} > pa.StructArray.from_pandas > {code} > ## From pyarrow to Spark Internal > Serialize and deserialize will fail. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Shen updated SPARK-34835: --- Description: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. ## From pandas to pyarrow {code:java} pa.StructArray.from_pandas {code} ## From pyarrow to Spark Internal Serialize and deserialize will fail. was:For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > > For user defined type with TimestampType, conversion from pandas to pyarrow > and from pyarrow to Spark UnsafeRow needs to be carefully handled. > ## From pandas to pyarrow > {code:java} > pa.StructArray.from_pandas > {code} > ## From pyarrow to Spark Internal > Serialize and deserialize will fail. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darcy Shen updated SPARK-34835: --- Description: For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled. > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > > For user defined type with TimestampType, conversion from pandas to pyarrow > and from pyarrow to Spark UnsafeRow needs to be carefully handled. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307557#comment-17307557 ] Dongjoon Hyun commented on SPARK-18105: --- Got it. Thank you for the details, [~devaraj]. Let's keep this open for a while because this issue needs more information. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Davies Liu >Priority: Major > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method
[ https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307554#comment-17307554 ] Dongjoon Hyun commented on SPARK-34674: --- Let's keep this issue open for a while to collect more opinion. We need more information for this issue report. > Spark app on k8s doesn't terminate without call to sparkContext.stop() method > - > > Key: SPARK-34674 > URL: https://issues.apache.org/jira/browse/SPARK-34674 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.1.1 >Reporter: Sergey Kotlov >Priority: Major > > Hello! > I have run into a problem that if I don't call the method > sparkContext.stop() explicitly, then a Spark driver process doesn't terminate > even after its Main method has been completed. This behaviour is different > from spark on yarn, where the manual sparkContext stopping is not required. > It looks like, the problem is in using non-daemon threads, which prevent the > driver jvm process from terminating. > At least I see two non-daemon threads, if I don't call sparkContext.stop(): > {code:java} > Thread[OkHttp kubernetes.default.svc,5,main] > Thread[OkHttp kubernetes.default.svc Writer,5,main] > {code} > Could you tell please, if it is possible to solve this problem? > Docker image from the official release of spark-3.1.1 hadoop3.2 is used. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307502#comment-17307502 ] Jason Yarbrough commented on SPARK-34843: - I'm definitely interested. Thank you :) Fortunately, this proposed formula should be able to handle potential overflow issues just as well as the old method, and also retains the highest level accuracy you can get with the stride size (short of adjusting the partition count... I've also coded up functionality that can dynamically determine that). > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition
[ https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307500#comment-17307500 ] Jason Yarbrough commented on SPARK-34844: - The way I'm currently supplying it is by using a Map with lowerBound, upperBound, numPartitions, and partitionColumn. For example: Map( "lowerBound" -> "1", "upperBound" -> "10", "numPartitions" -> "1000", "partitionColumn" -> "SomeIndexedColumn" ) In this case, I would expect the Spark code to respect the lower boundary, but it won't. The thing that makes it even more important is some users may provide very well picked bounds. For instance, I've coded up an "auto-partitioner." One of the things it does is use the approxQuantile function to determine a very accurate lower and upper bound (based off the partition count). I actually wish we could provide more breakpoints than just the lower and upper bounds, but that's for another ticket ;). > JDBCRelation columnPartition function includes the first stride in the lower > partition > -- > > Key: SPARK-34844 > URL: https://issues.apache.org/jira/browse/SPARK-34844 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, columnPartition in JDBCRelation contains logic that adds the first > stride into the lower partition. Because of this, the lower bound isn't used > as the ceiling for the lower partition. > For example, say we have data 0-10, 10 partitions, and the lowerBound is set > to 1. The lower/first partition should contain anything < 1. However, in the > current implementation, it would include anything < 2. > A possible easy fix would be changing the following code on line 132: > currentValue += stride > To: > if (i != 0) currentValue += stride > Or include currentValue += stride within the if statement on line 131... > although this creates a pretty bad looking side-effect. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307498#comment-17307498 ] Takeshi Yamamuro commented on SPARK-34843: -- If the overflow is rare, it might be worth improving it. For example, I think we might be able to improve it by checking whether the overflow can happen or not in advance. Anyway, if you're interested in this improvement, please feel free to open a PR for it. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307494#comment-17307494 ] Jason Yarbrough edited comment on SPARK-34843 at 3/24/21, 1:32 AM: --- Updated the formula. I also updated the comment and included the example you provided, to help others understand. was (Author: hanover-fiste): Updated the formula. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Attachment: (was: SPARK-34843.patch) > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Attachment: SPARK-34843.patch > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition
[ https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307496#comment-17307496 ] Takeshi Yamamuro edited comment on SPARK-34844 at 3/24/21, 1:30 AM: > It depends on how much data skew you have. Yea, I agree with that. What I wanted to say is that whether the proposed one works well or not also depends on data distribution. > However, my question would be why skip using the lower bound anyway? It would > make more sense to use what the user supplied. IIUC we don't have a strong reason on that (we've used the initial implementation for a long time: [https://github.com/apache/spark/commit/8f471a66db0571a76a21c0d93312197fee16174a]). I think there is no best solution if we don't know data distribution in advance. Any idea about how a user supplies these boundaries for partitions? was (Author: maropu): > It depends on how much data skew you have. Yea, I agree with that. What I wanted to say is that whether the proposed one works well or not also depends on data distribution. > However, my question would be why skip using the lower bound anyway? It would > make more sense to use what the user supplied. IIUC we don't have a strong reason on that (we've used the initial implementation for a long time: [https://github.com/apache/spark/commit/8f471a66db0571a76a21c0d93312197fee16174a]). I think there is no best solution if we don't know data distribution in advance. Any idea about how a user supply these boundaries for partitions? > JDBCRelation columnPartition function includes the first stride in the lower > partition > -- > > Key: SPARK-34844 > URL: https://issues.apache.org/jira/browse/SPARK-34844 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, columnPartition in JDBCRelation contains logic that adds the first > stride into the lower partition. Because of this, the lower bound isn't used > as the ceiling for the lower partition. > For example, say we have data 0-10, 10 partitions, and the lowerBound is set > to 1. The lower/first partition should contain anything < 1. However, in the > current implementation, it would include anything < 2. > A possible easy fix would be changing the following code on line 132: > currentValue += stride > To: > if (i != 0) currentValue += stride > Or include currentValue += stride within the if statement on line 131... > although this creates a pretty bad looking side-effect. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition
[ https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307496#comment-17307496 ] Takeshi Yamamuro commented on SPARK-34844: -- > It depends on how much data skew you have. Yea, I agree with that. What I wanted to say is that whether the proposed one works well or not also depends on data distribution. > However, my question would be why skip using the lower bound anyway? It would > make more sense to use what the user supplied. IIUC we don't have a strong reason on that (we've used the initial implementation for a long time: [https://github.com/apache/spark/commit/8f471a66db0571a76a21c0d93312197fee16174a]). I think there is no best solution if we don't know data distribution in advance. Any idea about how a user supply these boundaries for partitions? > JDBCRelation columnPartition function includes the first stride in the lower > partition > -- > > Key: SPARK-34844 > URL: https://issues.apache.org/jira/browse/SPARK-34844 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, columnPartition in JDBCRelation contains logic that adds the first > stride into the lower partition. Because of this, the lower bound isn't used > as the ceiling for the lower partition. > For example, say we have data 0-10, 10 partitions, and the lowerBound is set > to 1. The lower/first partition should contain anything < 1. However, in the > current implementation, it would include anything < 2. > A possible easy fix would be changing the following code on line 132: > currentValue += stride > To: > if (i != 0) currentValue += stride > Or include currentValue += stride within the if statement on line 131... > although this creates a pretty bad looking side-effect. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Attachment: SPARK-34843.patch > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Attachment: (was: SPARK-34843.patch) > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307494#comment-17307494 ] Jason Yarbrough commented on SPARK-34843: - Updated the formula. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / > (numPartitions - 2.0))).floor.toLong > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Description: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / (numPartitions - 2.0))).floor.toLong An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-06-05 (17323). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: ((18563L / (1000L - 2.0)) - (-15611 / (1000L - 2.0))).floor.toLong = 34 This would put the upper bound at 2020-02-28 (18321), which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years, 2022-11-22). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. was: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-06-05 (17323). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 2) = 34 This would put the upper bound at 2020-02-28 (18321), which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years, 2022-11-22). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = ((upperBound / (numPartitions - 2.0)) - (lowerBound / >
[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307493#comment-17307493 ] Jason Yarbrough commented on SPARK-34843: - That's a good example! Thank you for sharing that. I have a new formula that will handle that while also retaining the accuracy of the other proposed formula. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > (18563L - -15611) / (1000L - 2) = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
[ https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34846: Assignee: Apache Spark > Sort output rows in SQLQueryTestSuite even if a plan has a Sort node > > > Key: SPARK-34846 > URL: https://issues.apache.org/jira/browse/SPARK-34846 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.2, 3.0.3 >Reporter: Takeshi Yamamuro >Assignee: Apache Spark >Priority: Minor > > This ticket aims at sorting output rows in SQLQueryTestSuite even if a given > plan has a `Sort` node. In that case, the current logic skips sorting output > rows (see the code below for details): > https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522 > But, I think we still need to sort output rows if the node just sort output > rows by partial columns (instead of all columns). The current logic can lead > to a flaky test behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
[ https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307492#comment-17307492 ] Apache Spark commented on SPARK-34846: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/31946 > Sort output rows in SQLQueryTestSuite even if a plan has a Sort node > > > Key: SPARK-34846 > URL: https://issues.apache.org/jira/browse/SPARK-34846 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.2, 3.0.3 >Reporter: Takeshi Yamamuro >Priority: Minor > > This ticket aims at sorting output rows in SQLQueryTestSuite even if a given > plan has a `Sort` node. In that case, the current logic skips sorting output > rows (see the code below for details): > https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522 > But, I think we still need to sort output rows if the node just sort output > rows by partial columns (instead of all columns). The current logic can lead > to a flaky test behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
[ https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307490#comment-17307490 ] Apache Spark commented on SPARK-34846: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/31946 > Sort output rows in SQLQueryTestSuite even if a plan has a Sort node > > > Key: SPARK-34846 > URL: https://issues.apache.org/jira/browse/SPARK-34846 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.2, 3.0.3 >Reporter: Takeshi Yamamuro >Priority: Minor > > This ticket aims at sorting output rows in SQLQueryTestSuite even if a given > plan has a `Sort` node. In that case, the current logic skips sorting output > rows (see the code below for details): > https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522 > But, I think we still need to sort output rows if the node just sort output > rows by partial columns (instead of all columns). The current logic can lead > to a flaky test behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
[ https://issues.apache.org/jira/browse/SPARK-34846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34846: Assignee: (was: Apache Spark) > Sort output rows in SQLQueryTestSuite even if a plan has a Sort node > > > Key: SPARK-34846 > URL: https://issues.apache.org/jira/browse/SPARK-34846 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.2, 3.0.3 >Reporter: Takeshi Yamamuro >Priority: Minor > > This ticket aims at sorting output rows in SQLQueryTestSuite even if a given > plan has a `Sort` node. In that case, the current logic skips sorting output > rows (see the code below for details): > https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522 > But, I think we still need to sort output rows if the node just sort output > rows by partial columns (instead of all columns). The current logic can lead > to a flaky test behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34846) Sort output rows in SQLQueryTestSuite even if a plan has a Sort node
Takeshi Yamamuro created SPARK-34846: Summary: Sort output rows in SQLQueryTestSuite even if a plan has a Sort node Key: SPARK-34846 URL: https://issues.apache.org/jira/browse/SPARK-34846 Project: Spark Issue Type: Bug Components: SQL, Tests Affects Versions: 3.2.0, 3.1.2, 3.0.3 Reporter: Takeshi Yamamuro This ticket aims at sorting output rows in SQLQueryTestSuite even if a given plan has a `Sort` node. In that case, the current logic skips sorting output rows (see the code below for details): https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala#L503-L522 But, I think we still need to sort output rows if the node just sort output rows by partial columns (instead of all columns). The current logic can lead to a flaky test behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-30641) Project Matrix: Linear Models revisit and refactor
[ https://issues.apache.org/jira/browse/SPARK-30641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengruifeng reassigned SPARK-30641: Assignee: (was: zhengruifeng) > Project Matrix: Linear Models revisit and refactor > -- > > Key: SPARK-30641 > URL: https://issues.apache.org/jira/browse/SPARK-30641 > Project: Spark > Issue Type: New Feature > Components: ML, PySpark >Affects Versions: 3.1.0, 3.2.0 >Reporter: zhengruifeng >Priority: Major > > We had been refactoring linear models for a long time, and there still are > some works in the future. After some discuss among [~huaxingao] [~srowen] > [~weichenxu123] , we decide to gather related works under a sub-project > Matrix, it include: > # *Blockification (vectorization of vectors)* > ** vectors are stacked into matrices, so that high-level BLAS can be used > for better performance. (about ~3x faster on sparse datasets, up to ~18x > faster on dense datasets, see SPARK-31783 for details). > ** Since 3.1.1, LoR/SVC/LiR/AFT supports blockification, and we need to > blockify KMeans in the future. > # *Standardization (virutal centering)* > ** Existing impl of standardization in linear models does NOT center the > vectors by removing the means, for the purpose of keeping dataset > _*sparsity*_. However, this will cause feature values with small var be > scaled to large values, and underlying solver like LBFGS can not efficiently > handle this case. see SPARK-34448 for details. > ** If internal vectors are centers (like other famous impl, i.e. > GLMNET/Scikit-Learn), the convergence ratio will be better. In the case in > SPARK-34448, the number of iteration to convergence will be reduced from 93 > to 6. Moreover, the final solution is much more close to the one in GLMNET. > ** Luckily, we find a new way to _*virtually*_ center the vectors without > densifying the dataset. Good results had been observed in LoR, we will take > it into account in other linear models. > # _*Initialization (To be discussed)*_ > ** Initializing model coef with a given model, should be beneficial to: 1, > convergence ratio (should reduce number of iterations); 2, model stability > (may obtain a new solution more close to the previous one); > # _*Early Stopping* *(To be discussed)*_ > ** we can compute the test error in the procedure (like tree models), and > stop the training procedure if test error begin to increase; > > If you want to add other features in these models, please comment in > the ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition
[ https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307484#comment-17307484 ] Jason Yarbrough commented on SPARK-34844: - It depends on how much data skew you have. I think the main thing here is that the codes (seems) to needlessly lump in the first stride with the lower partition–it's not expected behavior. If I set a lower bound of 1, it should be 1, not 2 (using the example above). I can understand that the last partition boundary will not be perfect due to the strides not lining up exactly (although the current formula can create very suboptimal results), but I don't understand why the lower bound is skipped for the first partition. When working with data that is skewed, these things can add up and make a big difference. We shaved off 10 minutes on a 36 minute job by creating code that works around these issues (this incudes SPARK-34843). That was 27% taken off, which would be huge at larger scale. The lower bound will have less of a performance impact (if you don't hit memory issues) since other tasks will still be processing while the first partition is taking a long time. However, my question would be why skip using the lower bound anyway? It would make more sense to use what the user supplied. > JDBCRelation columnPartition function includes the first stride in the lower > partition > -- > > Key: SPARK-34844 > URL: https://issues.apache.org/jira/browse/SPARK-34844 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, columnPartition in JDBCRelation contains logic that adds the first > stride into the lower partition. Because of this, the lower bound isn't used > as the ceiling for the lower partition. > For example, say we have data 0-10, 10 partitions, and the lowerBound is set > to 1. The lower/first partition should contain anything < 1. However, in the > current implementation, it would include anything < 2. > A possible easy fix would be changing the following code on line 132: > currentValue += stride > To: > if (i != 0) currentValue += stride > Or include currentValue += stride within the if statement on line 131... > although this creates a pretty bad looking side-effect. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307483#comment-17307483 ] Takeshi Yamamuro commented on SPARK-34843: -- How about `upperBound=Long.MaxValue, lowerBound=Long.MinValue`? > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > (18563L - -15611) / (1000L - 2) = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition
[ https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307479#comment-17307479 ] Takeshi Yamamuro commented on SPARK-34844: -- The current logic can cause any issue? I feel there are no big difference between the current one and the proposed one though. > JDBCRelation columnPartition function includes the first stride in the lower > partition > -- > > Key: SPARK-34844 > URL: https://issues.apache.org/jira/browse/SPARK-34844 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, columnPartition in JDBCRelation contains logic that adds the first > stride into the lower partition. Because of this, the lower bound isn't used > as the ceiling for the lower partition. > For example, say we have data 0-10, 10 partitions, and the lowerBound is set > to 1. The lower/first partition should contain anything < 1. However, in the > current implementation, it would include anything < 2. > A possible easy fix would be changing the following code on line 132: > currentValue += stride > To: > if (i != 0) currentValue += stride > Or include currentValue += stride within the if statement on line 131... > although this creates a pretty bad looking side-effect. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307477#comment-17307477 ] Jason Yarbrough commented on SPARK-34843: - >From what I can see, there is protection in the code above the stride formula >that makes sure the partition count isn't greater than upperBound - >lowerBound. There is also code above that to make sure that someone doesn't >pass in a numPartition of 1. I'm having a hard time wrapping my head around where this proposed formula would cause an issue, but I'm hoping you, or someone, could help clarify. Would you be able to provide an example of the overflow issue in this case? > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > (18563L - -15611) / (1000L - 2) = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34835) Support TimestampType in UDT
[ https://issues.apache.org/jira/browse/SPARK-34835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307471#comment-17307471 ] Takeshi Yamamuro commented on SPARK-34835: -- Please fill the description. > Support TimestampType in UDT > > > Key: SPARK-34835 > URL: https://issues.apache.org/jira/browse/SPARK-34835 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.0.2, 3.1.1 >Reporter: Darcy Shen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307469#comment-17307469 ] Takeshi Yamamuro commented on SPARK-34843: -- To mitigate overflow issues, I think we use the current formula. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > (18563L - -15611) / (1000L - 2) = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Attachment: SPARK-34843.patch > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > Attachments: SPARK-34843.patch > > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-06-05 (17323). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > (18563L - -15611) / (1000L - 2) = 34 > This would put the upper bound at 2020-02-28 (18321), which is much closer to > the original supplied upper bound. This is the best you can do to get as > close as possible to the upper bound (without adjusting the number of > partitions). For example, a stride size of 35 would go well past the supplied > upper bound (over 2 years, 2022-11-22). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition
[ https://issues.apache.org/jira/browse/SPARK-34844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34844: Description: Currently, columnPartition in JDBCRelation contains logic that adds the first stride into the lower partition. Because of this, the lower bound isn't used as the ceiling for the lower partition. For example, say we have data 0-10, 10 partitions, and the lowerBound is set to 1. The lower/first partition should contain anything < 1. However, in the current implementation, it would include anything < 2. A possible easy fix would be changing the following code on line 132: currentValue += stride To: if (i != 0) currentValue += stride Or include currentValue += stride within the if statement on line 131... although this creates a pretty bad looking side-effect. was: Currently, columnPartition in JDBCRelation contains logic that adds the first stride into the lower partition. Because of this, the lower bound isn't used as the ceiling for the lower partition. For example, say we have data 0-10, 10 partitions, and the lowerBound is set to 1. The lower/first partition should contain anything < 1. However, in the current implementation, it would include anything < 2. A possible easy fix would be changing the following code on line 132: currentValue += stride To: if (i != 0) currentValue += stride Or include currentValue += stride within the if statement on line 131... although this creates a pretty nasty looking side-effect. > JDBCRelation columnPartition function includes the first stride in the lower > partition > -- > > Key: SPARK-34844 > URL: https://issues.apache.org/jira/browse/SPARK-34844 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, columnPartition in JDBCRelation contains logic that adds the first > stride into the lower partition. Because of this, the lower bound isn't used > as the ceiling for the lower partition. > For example, say we have data 0-10, 10 partitions, and the lowerBound is set > to 1. The lower/first partition should contain anything < 1. However, in the > current implementation, it would include anything < 2. > A possible easy fix would be changing the following code on line 132: > currentValue += stride > To: > if (i != 0) currentValue += stride > Or include currentValue += stride within the if statement on line 131... > although this creates a pretty bad looking side-effect. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Description: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-06-05 (17323). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 2) = 34 This would put the upper bound at 2020-02-28 (18321), which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years, 2022-11-22). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. was: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-06-05 (17323). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 1) = 34 This would put the upper bound at 2020-02-28 (18321), which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years, 2022-11-22). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 2) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Description: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-06-05 (17323). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 1) = 34 This would put the upper bound at 2020-02-28 (18321), which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years, 2022-11-22). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. was: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-06-05 (17323). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 1) = 34 This would put the upper bound at 2020-02-28 (18321), which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Description: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-06-05 (17323). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 1) = 34 This would put the upper bound at 2020-02-28 (18321), which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. was: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-07-08 (17356). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 1) = 34 This would put the upper bound at 2020-04-02, which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an
[jira] [Updated] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
[ https://issues.apache.org/jira/browse/SPARK-34843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Yarbrough updated SPARK-34843: Description: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) An example (using a date column): Say you're creating 1,000 partitions. If you provide a lower bound of 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 (translated to 18563), Spark determines the stride size as follows: (18563L / 1000L) - (-15611 / 1000L) = 33 Starting from the lower bound, doing 998 strides of 33, you end up at 2017-07-08 (17356). This is over 3 years of extra data that will go into the last partition, and depending on the shape of the data could cause a very long running task at the end of a job. Using the formula I'm proposing, you'd get: (18563L - -15611) / (1000L - 1) = 34 This would put the upper bound at 2020-04-02, which is much closer to the original supplied upper bound. This is the best you can do to get as close as possible to the upper bound (without adjusting the number of partitions). For example, a stride size of 35 would go well past the supplied upper bound (over 2 years). In the above example, there is only a difference of 1 between the stride size using the current formula and the stride size using the proposed formula, but with greater distance between the lower and upper bounds, or a lower number of partitions, the difference can be much greater. was: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) > JDBCRelation columnPartition function improperly determines stride size > --- > > Key: SPARK-34843 > URL: https://issues.apache.org/jira/browse/SPARK-34843 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Jason Yarbrough >Priority: Minor > > Currently, in JDBCRelation (line 123), the stride size is calculated as > follows: > val stride: Long = upperBound / numPartitions - lowerBound / numPartitions > > Due to truncation happening on both divisions, the stride size can fall short > of what it should be. This can lead to a big difference between the provided > upper bound and the actual start of the last partition. > I propose this formula, as it is much more accurate and leads to better > distribution: > val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) > > An example (using a date column): > Say you're creating 1,000 partitions. If you provide a lower bound of > 1927-04-05 (this gets translated to -15611), and an upper bound of 2020-10-27 > (translated to 18563), Spark determines the stride size as follows: > > (18563L / 1000L) - (-15611 / 1000L) = 33 > Starting from the lower bound, doing 998 strides of 33, you end up at > 2017-07-08 (17356). This is over 3 years of extra data that will go into the > last partition, and depending on the shape of the data could cause a very > long running task at the end of a job. > > Using the formula I'm proposing, you'd get: > (18563L - -15611) / (1000L - 1) = 34 > This would put the upper bound at 2020-04-02, which is much closer to the > original supplied upper bound. This is the best you can do to get as close as > possible to the upper bound (without adjusting the number of partitions). For > example, a stride size of 35 would go well past the supplied upper bound > (over 2 years). > > In the above example, there is only a difference of 1 between the stride size > using the current formula and the stride size using the proposed formula, but > with greater distance between the lower and upper bounds, or a lower number > of partitions, the difference can be much greater. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To
[jira] [Commented] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307456#comment-17307456 ] Apache Spark commented on SPARK-34845: -- User 'baohe-zhang' has created a pull request for this issue: https://github.com/apache/spark/pull/31945 > ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of > child pids metrics are missing > > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum > of a subset of child pids), instead of an all 0 result. This can be > misleading and is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > How to reproduce it? > This unit test is kind of self-explanatory: > {code:java} > val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) > val mockedP = spy(p) > // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be > returned > var ptree = Set(26109, 22764, 22763) > when(mockedP.computeProcessTree).thenReturn(ptree) > var r = mockedP.computeAllMetrics > assert(r.jvmVmemTotal == 0) > assert(r.jvmRSSTotal == 0) > assert(r.pythonVmemTotal == 0) > assert(r.pythonRSSTotal == 0) > {code} > In the current implementation, computeAllMetrics will reset the allMetrics to > 0 when processing 22764 because 22764's proc file doesn't exist, but then it > will continue processing pid 22763, and update allMetrics to procfs metrics > of pid 22763. > Also, a side effect of this bug is that it can lead to a verbose warning log > if many pids' stat files are missing. An early terminating can make the > warning logs more concise. > How to solve it? > The issue can be fixed by throwing IOException to computeAllMetrics(), in > that case, computeAllMetrics can aware that at lease one child pid's procfs > metrics is missing and then terminate the metrics reporting. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34845: Assignee: Apache Spark > ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of > child pids metrics are missing > > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Assignee: Apache Spark >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum > of a subset of child pids), instead of an all 0 result. This can be > misleading and is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > How to reproduce it? > This unit test is kind of self-explanatory: > {code:java} > val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) > val mockedP = spy(p) > // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be > returned > var ptree = Set(26109, 22764, 22763) > when(mockedP.computeProcessTree).thenReturn(ptree) > var r = mockedP.computeAllMetrics > assert(r.jvmVmemTotal == 0) > assert(r.jvmRSSTotal == 0) > assert(r.pythonVmemTotal == 0) > assert(r.pythonRSSTotal == 0) > {code} > In the current implementation, computeAllMetrics will reset the allMetrics to > 0 when processing 22764 because 22764's proc file doesn't exist, but then it > will continue processing pid 22763, and update allMetrics to procfs metrics > of pid 22763. > Also, a side effect of this bug is that it can lead to a verbose warning log > if many pids' stat files are missing. An early terminating can make the > warning logs more concise. > How to solve it? > The issue can be fixed by throwing IOException to computeAllMetrics(), in > that case, computeAllMetrics can aware that at lease one child pid's procfs > metrics is missing and then terminate the metrics reporting. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34845: Assignee: (was: Apache Spark) > ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of > child pids metrics are missing > > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum > of a subset of child pids), instead of an all 0 result. This can be > misleading and is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > How to reproduce it? > This unit test is kind of self-explanatory: > {code:java} > val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) > val mockedP = spy(p) > // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be > returned > var ptree = Set(26109, 22764, 22763) > when(mockedP.computeProcessTree).thenReturn(ptree) > var r = mockedP.computeAllMetrics > assert(r.jvmVmemTotal == 0) > assert(r.jvmRSSTotal == 0) > assert(r.pythonVmemTotal == 0) > assert(r.pythonRSSTotal == 0) > {code} > In the current implementation, computeAllMetrics will reset the allMetrics to > 0 when processing 22764 because 22764's proc file doesn't exist, but then it > will continue processing pid 22763, and update allMetrics to procfs metrics > of pid 22763. > Also, a side effect of this bug is that it can lead to a verbose warning log > if many pids' stat files are missing. An early terminating can make the > warning logs more concise. > How to solve it? > The issue can be fixed by throwing IOException to computeAllMetrics(), in > that case, computeAllMetrics can aware that at lease one child pid's procfs > metrics is missing and then terminate the metrics reporting. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307454#comment-17307454 ] Apache Spark commented on SPARK-34845: -- User 'baohe-zhang' has created a pull request for this issue: https://github.com/apache/spark/pull/31945 > ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of > child pids metrics are missing > > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum > of a subset of child pids), instead of an all 0 result. This can be > misleading and is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > How to reproduce it? > This unit test is kind of self-explanatory: > {code:java} > val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) > val mockedP = spy(p) > // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be > returned > var ptree = Set(26109, 22764, 22763) > when(mockedP.computeProcessTree).thenReturn(ptree) > var r = mockedP.computeAllMetrics > assert(r.jvmVmemTotal == 0) > assert(r.jvmRSSTotal == 0) > assert(r.pythonVmemTotal == 0) > assert(r.pythonRSSTotal == 0) > {code} > In the current implementation, computeAllMetrics will reset the allMetrics to > 0 when processing 22764 because 22764's proc file doesn't exist, but then it > will continue processing pid 22763, and update allMetrics to procfs metrics > of pid 22763. > Also, a side effect of this bug is that it can lead to a verbose warning log > if many pids' stat files are missing. An early terminating can make the > warning logs more concise. > How to solve it? > The issue can be fixed by throwing IOException to computeAllMetrics(), in > that case, computeAllMetrics can aware that at lease one child pid's procfs > metrics is missing and then terminate the metrics reporting. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307453#comment-17307453 ] Devaraj Kavali commented on SPARK-18105: [~dongjoon] We are seeing this error while running workloads with ~10TB shuffle data, we are not setting spark.io.encryption.enabled=true. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Davies Liu >Priority: Major > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Baohe Zhang updated SPARK-34845: Description: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. How to reproduce it? This unit test is kind of self-explanatory: {code:java} val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) val mockedP = spy(p) // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be returned var ptree = Set(26109, 22764, 22763) when(mockedP.computeProcessTree).thenReturn(ptree) var r = mockedP.computeAllMetrics assert(r.jvmVmemTotal == 0) assert(r.jvmRSSTotal == 0) assert(r.pythonVmemTotal == 0) assert(r.pythonRSSTotal == 0) {code} In the current implementation, computeAllMetrics will reset the allMetrics to 0 when processing 22764 because 22764's proc file doesn't exist, but then it will continue processing pid 22763, and update allMetrics to procfs metrics of pid 22763. Also, a side effect of this bug is that it can lead to a verbose warning log if many pids' stat files are missing. An early terminating can make the warning logs more concise. How to solve it? The issue can be fixed by throwing IOException to computeAllMetrics(), in that case computeAllMetrics can aware that at lease one child pid's procfs metrics is missing and then terminate the metrics reporting. was: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. Also, a side effect of it is that it can lead to a verbose warning log if many pids' stat files are missing. Also, a side effect of it is that it can lead to verbose warning logs if many pids' stat files are missing. {code:java} e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN org.apache.spark.executor.ProcfsMetricsGetter - There was a problem with reading the stat file of the process. java.io.FileNotFoundException: /proc/742/stat (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297) {code} The issue can be fixed by updating the flag isAvailable to false when one of the child pid's procfs metric is unavailable. Other methods computePid, computePageSize, and getChildPids already have this behavior. Summary: ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing (was: ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when some of child pids metrics are missing) > ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of > child pids metrics are missing > > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum > of a subset of child pids), instead of an all 0 result. This can be > misleading and is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > How to reproduce it? > This unit test is kind of self-explanatory: > > {code:java} > val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) > val mockedP = spy(p) > // proc file of pid
[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Baohe Zhang updated SPARK-34845: Description: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. How to reproduce it? This unit test is kind of self-explanatory: {code:java} val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) val mockedP = spy(p) // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be returned var ptree = Set(26109, 22764, 22763) when(mockedP.computeProcessTree).thenReturn(ptree) var r = mockedP.computeAllMetrics assert(r.jvmVmemTotal == 0) assert(r.jvmRSSTotal == 0) assert(r.pythonVmemTotal == 0) assert(r.pythonRSSTotal == 0) {code} In the current implementation, computeAllMetrics will reset the allMetrics to 0 when processing 22764 because 22764's proc file doesn't exist, but then it will continue processing pid 22763, and update allMetrics to procfs metrics of pid 22763. Also, a side effect of this bug is that it can lead to a verbose warning log if many pids' stat files are missing. An early terminating can make the warning logs more concise. How to solve it? The issue can be fixed by throwing IOException to computeAllMetrics(), in that case computeAllMetrics can aware that at lease one child pid's procfs metrics is missing and then terminate the metrics reporting. was: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. How to reproduce it? This unit test is kind of self-explanatory: {code:java} val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) val mockedP = spy(p) // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be returned var ptree = Set(26109, 22764, 22763) when(mockedP.computeProcessTree).thenReturn(ptree) var r = mockedP.computeAllMetrics assert(r.jvmVmemTotal == 0) assert(r.jvmRSSTotal == 0) assert(r.pythonVmemTotal == 0) assert(r.pythonRSSTotal == 0) {code} In the current implementation, computeAllMetrics will reset the allMetrics to 0 when processing 22764 because 22764's proc file doesn't exist, but then it will continue processing pid 22763, and update allMetrics to procfs metrics of pid 22763. Also, a side effect of this bug is that it can lead to a verbose warning log if many pids' stat files are missing. An early terminating can make the warning logs more concise. How to solve it? The issue can be fixed by throwing IOException to computeAllMetrics(), in that case computeAllMetrics can aware that at lease one child pid's procfs metrics is missing and then terminate the metrics reporting. > ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of > child pids metrics are missing > > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum > of a subset of child pids), instead of an all 0 result. This can be > misleading and is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > How to reproduce it? > This unit test is kind of self-explanatory: > {code:java} > val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) > val mockedP = spy(p) > // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be > returned > var ptree = Set(26109, 22764, 22763) > when(mockedP.computeProcessTree).thenReturn(ptree) > var r = mockedP.computeAllMetrics > assert(r.jvmVmemTotal == 0) > assert(r.jvmRSSTotal == 0) > assert(r.pythonVmemTotal == 0) > assert(r.pythonRSSTotal == 0) > {code} > In the current implementation, computeAllMetrics will reset the
[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Baohe Zhang updated SPARK-34845: Description: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. How to reproduce it? This unit test is kind of self-explanatory: {code:java} val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) val mockedP = spy(p) // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be returned var ptree = Set(26109, 22764, 22763) when(mockedP.computeProcessTree).thenReturn(ptree) var r = mockedP.computeAllMetrics assert(r.jvmVmemTotal == 0) assert(r.jvmRSSTotal == 0) assert(r.pythonVmemTotal == 0) assert(r.pythonRSSTotal == 0) {code} In the current implementation, computeAllMetrics will reset the allMetrics to 0 when processing 22764 because 22764's proc file doesn't exist, but then it will continue processing pid 22763, and update allMetrics to procfs metrics of pid 22763. Also, a side effect of this bug is that it can lead to a verbose warning log if many pids' stat files are missing. An early terminating can make the warning logs more concise. How to solve it? The issue can be fixed by throwing IOException to computeAllMetrics(), in that case, computeAllMetrics can aware that at lease one child pid's procfs metrics is missing and then terminate the metrics reporting. was: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. How to reproduce it? This unit test is kind of self-explanatory: {code:java} val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) val mockedP = spy(p) // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be returned var ptree = Set(26109, 22764, 22763) when(mockedP.computeProcessTree).thenReturn(ptree) var r = mockedP.computeAllMetrics assert(r.jvmVmemTotal == 0) assert(r.jvmRSSTotal == 0) assert(r.pythonVmemTotal == 0) assert(r.pythonRSSTotal == 0) {code} In the current implementation, computeAllMetrics will reset the allMetrics to 0 when processing 22764 because 22764's proc file doesn't exist, but then it will continue processing pid 22763, and update allMetrics to procfs metrics of pid 22763. Also, a side effect of this bug is that it can lead to a verbose warning log if many pids' stat files are missing. An early terminating can make the warning logs more concise. How to solve it? The issue can be fixed by throwing IOException to computeAllMetrics(), in that case computeAllMetrics can aware that at lease one child pid's procfs metrics is missing and then terminate the metrics reporting. > ProcfsMetricsGetter.computeAllMetrics may return partial metrics when some of > child pids metrics are missing > > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() may return partial metrics (the sum > of a subset of child pids), instead of an all 0 result. This can be > misleading and is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > How to reproduce it? > This unit test is kind of self-explanatory: > {code:java} > val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) > val mockedP = spy(p) > // proc file of pid 22764 doesn't exist, so partial metrics shouldn't be > returned > var ptree = Set(26109, 22764, 22763) > when(mockedP.computeProcessTree).thenReturn(ptree) > var r = mockedP.computeAllMetrics > assert(r.jvmVmemTotal == 0) > assert(r.jvmRSSTotal == 0) > assert(r.pythonVmemTotal == 0) > assert(r.pythonRSSTotal == 0) > {code} > In the current implementation, computeAllMetrics will reset the allMetrics
[jira] [Updated] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when some of child pids metrics are missing
[ https://issues.apache.org/jira/browse/SPARK-34845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Baohe Zhang updated SPARK-34845: Description: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. Also, a side effect of it is that it can lead to a verbose warning log if many pids' stat files are missing. Also, a side effect of it is that it can lead to verbose warning logs if many pids' stat files are missing. {code:java} e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN org.apache.spark.executor.ProcfsMetricsGetter - There was a problem with reading the stat file of the process. java.io.FileNotFoundException: /proc/742/stat (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297) {code} The issue can be fixed by updating the flag isAvailable to false when one of the child pid's procfs metric is unavailable. Other methods computePid, computePageSize, and getChildPids already have this behavior. was: When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. Also, a side effect of it is that it can lead to a verbose warning log if many pids' stat files are missing. Also, a side effect of it is that it can lead to verbose warning logs if many pids' stat files are missing. {noformat} e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN org.apache.spark.executor.ProcfsMetricsGetter - There was a problem with reading the stat file of the process. java.io.FileNotFoundException: /proc/742/stat (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297){noformat} The issue can be fixed by updating the flag isAvailable to false when one of the child pid's procfs metric is unavailable. Other methods computePid, computePageSize, and getChildPids already have this behavior. > ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when > some of child pids metrics are missing > -- > > Key: SPARK-34845 > URL: https://issues.apache.org/jira/browse/SPARK-34845 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > When the procfs metrics of some child pids are unavailable, > ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a > subset of child pids), instead of an all 0 result. This can be misleading and > is undesired per the current code comments in > [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. > > Also, a side effect of it is that it can lead to a verbose warning log if > many pids' stat files are missing. Also, a side effect of it is that it can > lead to verbose warning logs if many pids' stat files are missing. > {code:java} > e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN
[jira] [Created] (SPARK-34845) ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when some of child pids metrics are missing
Baohe Zhang created SPARK-34845: --- Summary: ProcfsMetricsGetter.computeAllMetrics shouldn't return partial metrics when some of child pids metrics are missing Key: SPARK-34845 URL: https://issues.apache.org/jira/browse/SPARK-34845 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.1, 3.1.0, 3.0.2, 3.0.1, 3.0.0 Reporter: Baohe Zhang When the procfs metrics of some child pids are unavailable, ProcfsMetricsGetter.computeAllMetrics() returns partial metrics (the sum of a subset of child pids), instead of an all 0 result. This can be misleading and is undesired per the current code comments in [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala#L214]. Also, a side effect of it is that it can lead to a verbose warning log if many pids' stat files are missing. Also, a side effect of it is that it can lead to verbose warning logs if many pids' stat files are missing. {noformat} e.g.2021-03-21 16:58:25,422 [pool-26-thread-8] WARN org.apache.spark.executor.ProcfsMetricsGetter - There was a problem with reading the stat file of the process. java.io.FileNotFoundException: /proc/742/stat (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.spark.executor.ProcfsMetricsGetter.openReader$1(ProcfsMetricsGetter.scala:203) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$addProcfsMetricsFromOneProcess$1(ProcfsMetricsGetter.scala:205) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2540) at org.apache.spark.executor.ProcfsMetricsGetter.addProcfsMetricsFromOneProcess(ProcfsMetricsGetter.scala:205) at org.apache.spark.executor.ProcfsMetricsGetter.$anonfun$computeAllMetrics$1(ProcfsMetricsGetter.scala:297){noformat} The issue can be fixed by updating the flag isAvailable to false when one of the child pid's procfs metric is unavailable. Other methods computePid, computePageSize, and getChildPids already have this behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34844) JDBCRelation columnPartition function includes the first stride in the lower partition
Jason Yarbrough created SPARK-34844: --- Summary: JDBCRelation columnPartition function includes the first stride in the lower partition Key: SPARK-34844 URL: https://issues.apache.org/jira/browse/SPARK-34844 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Jason Yarbrough Currently, columnPartition in JDBCRelation contains logic that adds the first stride into the lower partition. Because of this, the lower bound isn't used as the ceiling for the lower partition. For example, say we have data 0-10, 10 partitions, and the lowerBound is set to 1. The lower/first partition should contain anything < 1. However, in the current implementation, it would include anything < 2. A possible easy fix would be changing the following code on line 132: currentValue += stride To: if (i != 0) currentValue += stride Or include currentValue += stride within the if statement on line 131... although this creates a pretty nasty looking side-effect. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34843) JDBCRelation columnPartition function improperly determines stride size
Jason Yarbrough created SPARK-34843: --- Summary: JDBCRelation columnPartition function improperly determines stride size Key: SPARK-34843 URL: https://issues.apache.org/jira/browse/SPARK-34843 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Jason Yarbrough Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I propose this formula, as it is much more accurate and leads to better distribution: val stride: Long = (upperBound - lowerBound) / (numPartitions - 1) -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34674) Spark app on k8s doesn't terminate without call to sparkContext.stop() method
[ https://issues.apache.org/jira/browse/SPARK-34674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307398#comment-17307398 ] Sergey Kotlov commented on SPARK-34674: --- I saw it in the sources. But you are right, generally I should not rely on the functionality, that is not described in the official documentation. > Spark app on k8s doesn't terminate without call to sparkContext.stop() method > - > > Key: SPARK-34674 > URL: https://issues.apache.org/jira/browse/SPARK-34674 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.1.1 >Reporter: Sergey Kotlov >Priority: Major > > Hello! > I have run into a problem that if I don't call the method > sparkContext.stop() explicitly, then a Spark driver process doesn't terminate > even after its Main method has been completed. This behaviour is different > from spark on yarn, where the manual sparkContext stopping is not required. > It looks like, the problem is in using non-daemon threads, which prevent the > driver jvm process from terminating. > At least I see two non-daemon threads, if I don't call sparkContext.stop(): > {code:java} > Thread[OkHttp kubernetes.default.svc,5,main] > Thread[OkHttp kubernetes.default.svc Writer,5,main] > {code} > Could you tell please, if it is possible to solve this problem? > Docker image from the official release of spark-3.1.1 hadoop3.2 is used. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34684) Hadoop config could not be successfully serilized from driver pods to executor pods
[ https://issues.apache.org/jira/browse/SPARK-34684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307392#comment-17307392 ] Attila Zsolt Piros commented on SPARK-34684: > Have you tried to create a POD from a simple linux image with hadoop client > tools and access HDFS from command line? > Hadoop config could not be successfully serilized from driver pods to > executor pods > --- > > Key: SPARK-34684 > URL: https://issues.apache.org/jira/browse/SPARK-34684 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.0.1, 3.0.2 >Reporter: Yue Peng >Priority: Major > > I have set HADOOP_CONF_DIR correctly. And I have verified that hadoop configs > have been stored into a configmap and mounted to driver. However, spark pi > example job keeps failing where executor do not know how to talk to hdfs. I > highly suspect that there is a bug causing it, as I manually create a > configmap storing hadoop configs and mounted it to executor in template file, > which could fix the error. > > Spark submit command: > /opt/spark-3.0/bin/spark-submit --class org.apache.spark.examples.SparkPi > --deploy-mode cluster --master k8s://https://10.***.18.96:6443 > --num-executors 1 --conf spark.kubernetes.namespace=test --conf > spark.kubernetes.container.image= --conf > spark.kubernetes.driver.podTemplateFile=/opt/spark-3.0/conf/spark-driver.template > --conf > spark.kubernetes.executor.podTemplateFile=/opt/spark-3.0/conf/spark-executor.template > --conf spark.kubernetes.file.upload.path=/opt/spark-3.0/examples/jars > hdfs:///tmp/spark-examples_2.12-3.0.125067.jar 1000 > > > Error log: > > 21/03/10 06:59:58 INFO TransportClientFactory: Successfully created > connection to > org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078 > after 608 ms (392 ms spent in bootstraps) > 21/03/10 06:59:58 INFO SecurityManager: Changing view acls to: root > 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls to: root > 21/03/10 06:59:58 INFO SecurityManager: Changing view acls groups to: > 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls groups to: > 21/03/10 06:59:58 INFO SecurityManager: SecurityManager: authentication > enabled; ui acls disabled; users with view permissions: Set(root); groups > with view permissions: Set(); users with modify permissions: Set(root); > groups with modify permissions: Set() > 21/03/10 06:59:59 INFO TransportClientFactory: Successfully created > connection to > org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078 > after 130 ms (104 ms spent in bootstraps) > 21/03/10 06:59:59 INFO DiskBlockManager: Created local directory at > /var/data/spark-0f541e3d-994f-4c7a-843f-f7dac57dfc13/blockmgr-981cfb62-5b27-4d1a-8fbd-eddb466faf1d > 21/03/10 06:59:59 INFO MemoryStore: MemoryStore started with capacity 2047.2 > MiB > 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Connecting to driver: > spark://coarsegrainedschedu...@org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc:7078 > 21/03/10 06:59:59 INFO ResourceUtils: > == > 21/03/10 06:59:59 INFO ResourceUtils: Resources for spark.executor: > 21/03/10 06:59:59 INFO ResourceUtils: > == > 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Successfully registered > with driver > 21/03/10 06:59:59 INFO Executor: Starting executor ID 1 on host 100.64.0.192 > 21/03/10 07:00:00 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37956. > 21/03/10 07:00:00 INFO NettyBlockTransferService: Server created on > 100.64.0.192:37956 > 21/03/10 07:00:00 INFO BlockManager: Using > org.apache.spark.storage.RandomBlockReplicationPolicy for block replication > policy > 21/03/10 07:00:00 INFO BlockManagerMaster: Registering BlockManager > BlockManagerId(1, 100.64.0.192, 37956, None) > 21/03/10 07:00:00 INFO BlockManagerMaster: Registered BlockManager > BlockManagerId(1, 100.64.0.192, 37956, None) > 21/03/10 07:00:00 INFO BlockManager: Initialized BlockManager: > BlockManagerId(1, 100.64.0.192, 37956, None) > 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 0 > 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 1 > 21/03/10 07:00:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 21/03/10 07:00:01 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 21/03/10 07:00:01 INFO Executor: Fetching > spark://org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc:7078/jars/spark-examples_2.12-3.0.125067.jar > with timestamp
[jira] [Commented] (SPARK-34684) Hadoop config could not be successfully serilized from driver pods to executor pods
[ https://issues.apache.org/jira/browse/SPARK-34684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307375#comment-17307375 ] shanyu zhao commented on SPARK-34684: - [~attilapiros] What about we want to connect to HDFS HA with something like hdfs://mycluster/...? We need to deliver hdfs-site.xml to the driver or executor pods, right? Also we'd like to control the storage client's (Hadoop file system) behavior with the configuration file. > Hadoop config could not be successfully serilized from driver pods to > executor pods > --- > > Key: SPARK-34684 > URL: https://issues.apache.org/jira/browse/SPARK-34684 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.0.1, 3.0.2 >Reporter: Yue Peng >Priority: Major > > I have set HADOOP_CONF_DIR correctly. And I have verified that hadoop configs > have been stored into a configmap and mounted to driver. However, spark pi > example job keeps failing where executor do not know how to talk to hdfs. I > highly suspect that there is a bug causing it, as I manually create a > configmap storing hadoop configs and mounted it to executor in template file, > which could fix the error. > > Spark submit command: > /opt/spark-3.0/bin/spark-submit --class org.apache.spark.examples.SparkPi > --deploy-mode cluster --master k8s://https://10.***.18.96:6443 > --num-executors 1 --conf spark.kubernetes.namespace=test --conf > spark.kubernetes.container.image= --conf > spark.kubernetes.driver.podTemplateFile=/opt/spark-3.0/conf/spark-driver.template > --conf > spark.kubernetes.executor.podTemplateFile=/opt/spark-3.0/conf/spark-executor.template > --conf spark.kubernetes.file.upload.path=/opt/spark-3.0/examples/jars > hdfs:///tmp/spark-examples_2.12-3.0.125067.jar 1000 > > > Error log: > > 21/03/10 06:59:58 INFO TransportClientFactory: Successfully created > connection to > org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078 > after 608 ms (392 ms spent in bootstraps) > 21/03/10 06:59:58 INFO SecurityManager: Changing view acls to: root > 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls to: root > 21/03/10 06:59:58 INFO SecurityManager: Changing view acls groups to: > 21/03/10 06:59:58 INFO SecurityManager: Changing modify acls groups to: > 21/03/10 06:59:58 INFO SecurityManager: SecurityManager: authentication > enabled; ui acls disabled; users with view permissions: Set(root); groups > with view permissions: Set(); users with modify permissions: Set(root); > groups with modify permissions: Set() > 21/03/10 06:59:59 INFO TransportClientFactory: Successfully created > connection to > org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc/100.64.0.191:7078 > after 130 ms (104 ms spent in bootstraps) > 21/03/10 06:59:59 INFO DiskBlockManager: Created local directory at > /var/data/spark-0f541e3d-994f-4c7a-843f-f7dac57dfc13/blockmgr-981cfb62-5b27-4d1a-8fbd-eddb466faf1d > 21/03/10 06:59:59 INFO MemoryStore: MemoryStore started with capacity 2047.2 > MiB > 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Connecting to driver: > spark://coarsegrainedschedu...@org-apache-spark-examples-sparkpi-0e58b6781aeef2d5-driver-svc.test.svc:7078 > 21/03/10 06:59:59 INFO ResourceUtils: > == > 21/03/10 06:59:59 INFO ResourceUtils: Resources for spark.executor: > 21/03/10 06:59:59 INFO ResourceUtils: > == > 21/03/10 06:59:59 INFO CoarseGrainedExecutorBackend: Successfully registered > with driver > 21/03/10 06:59:59 INFO Executor: Starting executor ID 1 on host 100.64.0.192 > 21/03/10 07:00:00 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37956. > 21/03/10 07:00:00 INFO NettyBlockTransferService: Server created on > 100.64.0.192:37956 > 21/03/10 07:00:00 INFO BlockManager: Using > org.apache.spark.storage.RandomBlockReplicationPolicy for block replication > policy > 21/03/10 07:00:00 INFO BlockManagerMaster: Registering BlockManager > BlockManagerId(1, 100.64.0.192, 37956, None) > 21/03/10 07:00:00 INFO BlockManagerMaster: Registered BlockManager > BlockManagerId(1, 100.64.0.192, 37956, None) > 21/03/10 07:00:00 INFO BlockManager: Initialized BlockManager: > BlockManagerId(1, 100.64.0.192, 37956, None) > 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 0 > 21/03/10 07:00:01 INFO CoarseGrainedExecutorBackend: Got assigned task 1 > 21/03/10 07:00:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 21/03/10 07:00:01 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 21/03/10 07:00:01 INFO Executor: Fetching >
[jira] [Commented] (SPARK-34297) Add metrics for data loss and offset out range for KafkaMicroBatchStream
[ https://issues.apache.org/jira/browse/SPARK-34297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307321#comment-17307321 ] Apache Spark commented on SPARK-34297: -- User 'yijiacui-db' has created a pull request for this issue: https://github.com/apache/spark/pull/31944 > Add metrics for data loss and offset out range for KafkaMicroBatchStream > > > Key: SPARK-34297 > URL: https://issues.apache.org/jira/browse/SPARK-34297 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Assignee: L. C. Hsieh >Priority: Major > > When testing SS, I found it is hard to track data loss of SS reading from > Kafka. The micro scan node has only one metric, number of output rows. Users > have no idea how many times offsets to fetch are out of Kafak now, how many > times data loss happens. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33482) V2 Datasources that extend FileScan preclude exchange reuse
[ https://issues.apache.org/jira/browse/SPARK-33482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-33482: -- Fix Version/s: (was: 3.0.3) > V2 Datasources that extend FileScan preclude exchange reuse > --- > > Key: SPARK-33482 > URL: https://issues.apache.org/jira/browse/SPARK-33482 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Bruce Robbins >Assignee: Peter Toth >Priority: Major > Fix For: 3.2.0, 3.1.2 > > > Sample query: > {noformat} > spark.read.parquet("tbl").createOrReplaceTempView("tbl") > spark.read.parquet("lookup").createOrReplaceTempView("lookup") > sql(""" >select tbl.col1, fk1, fk2 >from tbl, lookup l1, lookup l2 >where fk1 = l1.key >and fk2 = l2.key > """).explain > {noformat} > Test files can be created as so: > {noformat} > import scala.util.Random > val rand = Random > val tbl = spark.range(1, 1).map { x => > (rand.nextLong.abs % 20, >rand.nextLong.abs % 20, >x) > }.toDF("fk1", "fk2", "col1") > tbl.write.mode("overwrite").parquet("tbl") > val lookup = spark.range(0, 20).map { x => > (x + 1, x * 1, (x + 1) * 1) > }.toDF("key", "col1", "col2") > lookup.write.mode("overwrite").parquet("lookup") > {noformat} > Output with V1 Parquet reader: > {noformat} > == Physical Plan == > *(3) Project [col1#2L, fk1#0L, fk2#1L] > +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false >:- *(3) Project [fk1#0L, fk2#1L, col1#2L] >: +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false >: :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L)) >: : +- *(3) ColumnarToRow >: : +- FileScan parquet [fk1#0L,fk2#1L,col1#2L] Batched: true, > DataFilters: [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: Parquet, > Location: InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], > PartitionFilters: [], PushedFilters: [IsNotNull(fk1), IsNotNull(fk2)], > ReadSchema: struct >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [id=#75] >:+- *(1) Filter isnotnull(key#6L) >: +- *(1) ColumnarToRow >: +- FileScan parquet [key#6L] Batched: true, DataFilters: > [isnotnull(key#6L)], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], > PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: > struct >+- ReusedExchange [key#12L], BroadcastExchange > HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75] > {noformat} > With V1 Parquet reader, the exchange for lookup is reused (see last line). > Output with V2 Parquet reader (spark.sql.sources.useV1SourceList=""): > {noformat} > == Physical Plan == > *(3) Project [col1#2L, fk1#0L, fk2#1L] > +- *(3) BroadcastHashJoin [fk2#1L], [key#12L], Inner, BuildRight, false >:- *(3) Project [fk1#0L, fk2#1L, col1#2L] >: +- *(3) BroadcastHashJoin [fk1#0L], [key#6L], Inner, BuildRight, false >: :- *(3) Filter (isnotnull(fk1#0L) AND isnotnull(fk2#1L)) >: : +- *(3) ColumnarToRow >: : +- BatchScan[fk1#0L, fk2#1L, col1#2L] ParquetScan DataFilters: > [isnotnull(fk1#0L), isnotnull(fk2#1L)], Format: parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/tbl], > PartitionFilters: [], PushedFilers: [IsNotNull(fk1), IsNotNull(fk2)], > ReadSchema: struct, PushedFilters: > [IsNotNull(fk1), IsNotNull(fk2)] >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [id=#75] >:+- *(1) Filter isnotnull(key#6L) >: +- *(1) ColumnarToRow >: +- BatchScan[key#6L] ParquetScan DataFilters: > [isnotnull(key#6L)], Format: parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], > PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: > struct, PushedFilters: [IsNotNull(key)] >+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [id=#83] > +- *(2) Filter isnotnull(key#12L) > +- *(2) ColumnarToRow > +- BatchScan[key#12L] ParquetScan DataFilters: > [isnotnull(key#12L)], Format: parquet, Location: > InMemoryFileIndex[file:/Users/bruce/github/spark_upstream/lookup], > PartitionFilters: [], PushedFilers: [IsNotNull(key)], ReadSchema: > struct, PushedFilters: [IsNotNull(key)] > {noformat} > With the V2 Parquet reader, the exchange for lookup is not reused (see last 4 > lines). > You can see the same issue with the Orc reader (and I assume any other > datasource that extends Filescan). > The issue appears to be this check in
[jira] [Updated] (SPARK-34756) Fix FileScan equality check
[ https://issues.apache.org/jira/browse/SPARK-34756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-34756: -- Fix Version/s: (was: 3.0.3) > Fix FileScan equality check > --- > > Key: SPARK-34756 > URL: https://issues.apache.org/jira/browse/SPARK-34756 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Peter Toth >Assignee: Peter Toth >Priority: Major > Fix For: 3.2.0, 3.1.2 > > > `&&` is missing from `FileScan.equals()`. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
[ https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-34842: -- Reporter: Takeshi Yamamuro (was: Dongjoon Hyun) > Corrects the type of date_dim.d_quarter_name in the TPCDS schema > > > Key: SPARK-34842 > URL: https://issues.apache.org/jira/browse/SPARK-34842 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.1 >Reporter: Takeshi Yamamuro >Assignee: Takeshi Yamamuro >Priority: Major > Fix For: 3.2.0, 3.1.2 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
[ https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-34842: - Assignee: Takeshi Yamamuro > Corrects the type of date_dim.d_quarter_name in the TPCDS schema > > > Key: SPARK-34842 > URL: https://issues.apache.org/jira/browse/SPARK-34842 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.1 >Reporter: Dongjoon Hyun >Assignee: Takeshi Yamamuro >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
[ https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-34842. --- Fix Version/s: 3.1.2 3.2.0 Resolution: Fixed Issue resolved by pull request 31943 [https://github.com/apache/spark/pull/31943] > Corrects the type of date_dim.d_quarter_name in the TPCDS schema > > > Key: SPARK-34842 > URL: https://issues.apache.org/jira/browse/SPARK-34842 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.1 >Reporter: Dongjoon Hyun >Assignee: Takeshi Yamamuro >Priority: Major > Fix For: 3.2.0, 3.1.2 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
[ https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307253#comment-17307253 ] Apache Spark commented on SPARK-34842: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/31943 > Corrects the type of date_dim.d_quarter_name in the TPCDS schema > > > Key: SPARK-34842 > URL: https://issues.apache.org/jira/browse/SPARK-34842 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.1 >Reporter: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
[ https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34842: Assignee: (was: Apache Spark) > Corrects the type of date_dim.d_quarter_name in the TPCDS schema > > > Key: SPARK-34842 > URL: https://issues.apache.org/jira/browse/SPARK-34842 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.1 >Reporter: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
[ https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307254#comment-17307254 ] Apache Spark commented on SPARK-34842: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/31943 > Corrects the type of date_dim.d_quarter_name in the TPCDS schema > > > Key: SPARK-34842 > URL: https://issues.apache.org/jira/browse/SPARK-34842 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.1 >Reporter: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
[ https://issues.apache.org/jira/browse/SPARK-34842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34842: Assignee: Apache Spark > Corrects the type of date_dim.d_quarter_name in the TPCDS schema > > > Key: SPARK-34842 > URL: https://issues.apache.org/jira/browse/SPARK-34842 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.2.0, 3.1.1 >Reporter: Dongjoon Hyun >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34842) Corrects the type of date_dim.d_quarter_name in the TPCDS schema
Dongjoon Hyun created SPARK-34842: - Summary: Corrects the type of date_dim.d_quarter_name in the TPCDS schema Key: SPARK-34842 URL: https://issues.apache.org/jira/browse/SPARK-34842 Project: Spark Issue Type: Bug Components: SQL, Tests Affects Versions: 3.1.1, 3.2.0 Reporter: Dongjoon Hyun -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34841) Push ANSI interval binary expressions into into (if / case) branches
[ https://issues.apache.org/jira/browse/SPARK-34841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307236#comment-17307236 ] Max Gekk commented on SPARK-34841: -- See https://github.com/apache/spark/pull/30955#discussion_r599678286 > Push ANSI interval binary expressions into into (if / case) branches > > > Key: SPARK-34841 > URL: https://issues.apache.org/jira/browse/SPARK-34841 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Max Gekk >Priority: Major > > Add DateAddYMInterval, TimestampAddYMInterval and TimeAdd to > PushFoldableIntoBranches.supportedBinaryExpression + tests. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34841) Push ANSI interval binary expressions into into (if / case) branches
Max Gekk created SPARK-34841: Summary: Push ANSI interval binary expressions into into (if / case) branches Key: SPARK-34841 URL: https://issues.apache.org/jira/browse/SPARK-34841 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.2.0 Reporter: Max Gekk Add DateAddYMInterval, TimestampAddYMInterval and TimeAdd to PushFoldableIntoBranches.supportedBinaryExpression + tests. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-34824) Multiply year-month interval by numeric
[ https://issues.apache.org/jira/browse/SPARK-34824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk resolved SPARK-34824. -- Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by pull request 31929 [https://github.com/apache/spark/pull/31929] > Multiply year-month interval by numeric > --- > > Key: SPARK-34824 > URL: https://issues.apache.org/jira/browse/SPARK-34824 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Max Gekk >Assignee: Max Gekk >Priority: Major > Fix For: 3.2.0 > > > Support the multiply op over year-month interval by numeric types including: > # ByteType > # ShortType > # IntegerType > # LongType > # FloatType > # DoubleType > # DecimalType -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed
[ https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307198#comment-17307198 ] Apache Spark commented on SPARK-34840: -- User 'otterc' has created a pull request for this issue: https://github.com/apache/spark/pull/31934 > Fix cases of corruption in merged shuffle blocks that are pushed > > > Key: SPARK-34840 > URL: https://issues.apache.org/jira/browse/SPARK-34840 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and > merges them was introduced in > [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 > scenarios where the merged blocks get corrupted: > # {{StreamCallback.onFailure()}} is called more than once. Initially we > assumed that the onFailure callback will be called just once per stream. > However, we observed that this is called twice when a client connection is > reset. When the client connection is reset then there are 2 events that get > triggered in this order. > * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.exceptionCaught()}} invokes > {{callback.onFailure(streamId, cause)}}. This is the first time > StreamCallback.onFailure() will be invoked. > * {{channelInactive}}. Since the channel closes, the {{channelInactive}} > event gets triggered which again is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.channelInactive()}} invokes > {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the > second time StreamCallback.onFailure() will be invoked. > # The flag {{isWriting}} is set prematurely to true. This introduces an edge > case where a stream that is trying to merge a duplicate block (created > because of a speculative task) may interfere with an active stream if the > duplicate stream fails. > Also adding additional changes that improve the code. > # Using positional writes all the time because this simplifies the code and > with microbenchmarking haven't seen any performance impact. > # Additional minor changes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed
[ https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34840: Assignee: (was: Apache Spark) > Fix cases of corruption in merged shuffle blocks that are pushed > > > Key: SPARK-34840 > URL: https://issues.apache.org/jira/browse/SPARK-34840 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and > merges them was introduced in > [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 > scenarios where the merged blocks get corrupted: > # {{StreamCallback.onFailure()}} is called more than once. Initially we > assumed that the onFailure callback will be called just once per stream. > However, we observed that this is called twice when a client connection is > reset. When the client connection is reset then there are 2 events that get > triggered in this order. > * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.exceptionCaught()}} invokes > {{callback.onFailure(streamId, cause)}}. This is the first time > StreamCallback.onFailure() will be invoked. > * {{channelInactive}}. Since the channel closes, the {{channelInactive}} > event gets triggered which again is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.channelInactive()}} invokes > {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the > second time StreamCallback.onFailure() will be invoked. > # The flag {{isWriting}} is set prematurely to true. This introduces an edge > case where a stream that is trying to merge a duplicate block (created > because of a speculative task) may interfere with an active stream if the > duplicate stream fails. > Also adding additional changes that improve the code. > # Using positional writes all the time because this simplifies the code and > with microbenchmarking haven't seen any performance impact. > # Additional minor changes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed
[ https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307197#comment-17307197 ] Apache Spark commented on SPARK-34840: -- User 'otterc' has created a pull request for this issue: https://github.com/apache/spark/pull/31934 > Fix cases of corruption in merged shuffle blocks that are pushed > > > Key: SPARK-34840 > URL: https://issues.apache.org/jira/browse/SPARK-34840 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and > merges them was introduced in > [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 > scenarios where the merged blocks get corrupted: > # {{StreamCallback.onFailure()}} is called more than once. Initially we > assumed that the onFailure callback will be called just once per stream. > However, we observed that this is called twice when a client connection is > reset. When the client connection is reset then there are 2 events that get > triggered in this order. > * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.exceptionCaught()}} invokes > {{callback.onFailure(streamId, cause)}}. This is the first time > StreamCallback.onFailure() will be invoked. > * {{channelInactive}}. Since the channel closes, the {{channelInactive}} > event gets triggered which again is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.channelInactive()}} invokes > {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the > second time StreamCallback.onFailure() will be invoked. > # The flag {{isWriting}} is set prematurely to true. This introduces an edge > case where a stream that is trying to merge a duplicate block (created > because of a speculative task) may interfere with an active stream if the > duplicate stream fails. > Also adding additional changes that improve the code. > # Using positional writes all the time because this simplifies the code and > with microbenchmarking haven't seen any performance impact. > # Additional minor changes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed
[ https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34840: Assignee: Apache Spark > Fix cases of corruption in merged shuffle blocks that are pushed > > > Key: SPARK-34840 > URL: https://issues.apache.org/jira/browse/SPARK-34840 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Assignee: Apache Spark >Priority: Major > > The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and > merges them was introduced in > [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 > scenarios where the merged blocks get corrupted: > # {{StreamCallback.onFailure()}} is called more than once. Initially we > assumed that the onFailure callback will be called just once per stream. > However, we observed that this is called twice when a client connection is > reset. When the client connection is reset then there are 2 events that get > triggered in this order. > * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.exceptionCaught()}} invokes > {{callback.onFailure(streamId, cause)}}. This is the first time > StreamCallback.onFailure() will be invoked. > * {{channelInactive}}. Since the channel closes, the {{channelInactive}} > event gets triggered which again is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.channelInactive()}} invokes > {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the > second time StreamCallback.onFailure() will be invoked. > # The flag {{isWriting}} is set prematurely to true. This introduces an edge > case where a stream that is trying to merge a duplicate block (created > because of a speculative task) may interfere with an active stream if the > duplicate stream fails. > Also adding additional changes that improve the code. > # Using positional writes all the time because this simplifies the code and > with microbenchmarking haven't seen any performance impact. > # Additional minor changes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed
Chandni Singh created SPARK-34840: - Summary: Fix cases of corruption in merged shuffle blocks that are pushed Key: SPARK-34840 URL: https://issues.apache.org/jira/browse/SPARK-34840 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and merges them was introduced in [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 scenarios where the merged blocks get corrupted: # {{StreamCallback.onFailure()}} is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order. * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. {{StreamInterceptor.exceptionCaught()}} invokes {{callback.onFailure(streamId, cause)}}. This is the first time StreamCallback.onFailure() will be invoked. * {{channelInactive}}. Since the channel closes, the {{channelInactive}} event gets triggered which again is propagated to {{StreamInterceptor}}. {{StreamInterceptor.channelInactive()}} invokes {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the second time StreamCallback.onFailure() will be invoked. # The flag {{isWriting}} is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails. Also adding additional changes that improve the code. # Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact. # Additional minor changes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34839) FileNotFoundException on _temporary when multiple app write to same table
Bimalendu Choudhary created SPARK-34839: --- Summary: FileNotFoundException on _temporary when multiple app write to same table Key: SPARK-34839 URL: https://issues.apache.org/jira/browse/SPARK-34839 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Environment: CDH 6.2.1 Hadoop 3.0.0 Reporter: Bimalendu Choudhary When multiple Spark applications are writing to the same hive table ( but different partitions so not interfering with each other in any way), the application finishing first ends up deleting the parent _temporary directory which is still being used by other application. I think the temporary directory being used by FileOutputCommitter should be made configurable to let the caller call with with its own unique value as per the requirement and without having to worry about some other application deleting it unknowingly. Something like: {quote} public static final String PENDING_DIR_NAME_DEFAULT = "_temporary"; public static final String PENDING_DIR_NAME_DEFAULT = "mapreduce.fileoutputcommitter.tempdir"; {quote} We can not use mapreduce.fileoutputcommitter.algorithm.version = 2 due to its issue with data loss https://issues.apache.org/jira/browse/MAPREDUCE-7282. There are similar Jira https://issues.apache.org/jira/browse/SPARK-18883, whihc was not resolved. This is very generic case of simply one spark application messing up working of other spark application working on same table and can be avoided by making temp unique or configurable. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34838) consider markdownlint
Josh Soref created SPARK-34838: -- Summary: consider markdownlint Key: SPARK-34838 URL: https://issues.apache.org/jira/browse/SPARK-34838 Project: Spark Issue Type: Bug Components: Build, Documentation Affects Versions: 3.1.1 Reporter: Josh Soref There's apparently a tool called markdownlint which seems to be integrated w/ Visual Studio Code. It suggests, among other things, to use {{*}} instead of {{-}} for top level bullets: https://github.com/DavidAnson/markdownlint/blob/v0.23.1/doc/Rules.md#md004 https://github.com/apache/spark/pull/30679#issuecomment-804971370 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34837) Support ANSI SQL intervals by the aggregate function `avg`
[ https://issues.apache.org/jira/browse/SPARK-34837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Gekk updated SPARK-34837: - Description: Extend org.apache.spark.sql.catalyst.expressions.aggregate.Average to support DayTimeIntervalType and YearMonthIntervalType. (was: Extend org.apache.spark.sql.catalyst.expressions.aggregate.Sum to support DayTimeIntervalType and YearMonthIntervalType.) > Support ANSI SQL intervals by the aggregate function `avg` > -- > > Key: SPARK-34837 > URL: https://issues.apache.org/jira/browse/SPARK-34837 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: Max Gekk >Priority: Major > > Extend org.apache.spark.sql.catalyst.expressions.aggregate.Average to support > DayTimeIntervalType and YearMonthIntervalType. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34837) Support ANSI SQL intervals by the aggregate function `avg`
Max Gekk created SPARK-34837: Summary: Support ANSI SQL intervals by the aggregate function `avg` Key: SPARK-34837 URL: https://issues.apache.org/jira/browse/SPARK-34837 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.2.0 Reporter: Max Gekk Extend org.apache.spark.sql.catalyst.expressions.aggregate.Sum to support DayTimeIntervalType and YearMonthIntervalType. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34719) fail if the view query has duplicated column names
[ https://issues.apache.org/jira/browse/SPARK-34719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan updated SPARK-34719: Fix Version/s: 3.2.0 2.4.8 > fail if the view query has duplicated column names > -- > > Key: SPARK-34719 > URL: https://issues.apache.org/jira/browse/SPARK-34719 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.7, 3.0.0, 3.1.0, 3.1.1 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Blocker > Labels: correctness > Fix For: 2.4.8, 3.2.0, 3.1.2, 3.0.3 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-34421) Custom functions can't be used in temporary views with CTEs
[ https://issues.apache.org/jira/browse/SPARK-34421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-34421. - Resolution: Fixed > Custom functions can't be used in temporary views with CTEs > --- > > Key: SPARK-34421 > URL: https://issues.apache.org/jira/browse/SPARK-34421 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0 > Environment: Databricks Runtime 8.0 >Reporter: Lauri Koobas >Assignee: Peter Toth >Priority: Blocker > Fix For: 3.0.3, 3.1.1 > > > The following query works in Spark 3.0 not Spark 3.1. > > Start with: > {{spark.udf.registerJavaFunction("custom_func", > "com.stuff.path.custom_func", LongType())}} > > Works: * {{select custom_func()}} > * {{create temporary view blaah as select custom_func()}} > * {{with step_1 as ( select custom_func() ) select * from step_1}} > Broken: > {{create temporary view blaah as with step_1 as ( select custom_func() ) > select * from step_1}} > > followed by: > {{select * from blaah}} > > Error: > {{Error in SQL statement: AnalysisException: No handler for UDF/UDAF/UDTF > '}}{{com.stuff.path.custom_func}}{{';}} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34421) Custom functions can't be used in temporary views with CTEs
[ https://issues.apache.org/jira/browse/SPARK-34421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307101#comment-17307101 ] Wenchen Fan commented on SPARK-34421: - [~laurikoobas] It's an internal mistake that this fix doesn't go to DBR 8.1, and it's already fixed 8 hours ago. Apache Spark has no problem at all. > Custom functions can't be used in temporary views with CTEs > --- > > Key: SPARK-34421 > URL: https://issues.apache.org/jira/browse/SPARK-34421 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0 > Environment: Databricks Runtime 8.0 >Reporter: Lauri Koobas >Assignee: Peter Toth >Priority: Blocker > Fix For: 3.1.1, 3.0.3 > > > The following query works in Spark 3.0 not Spark 3.1. > > Start with: > {{spark.udf.registerJavaFunction("custom_func", > "com.stuff.path.custom_func", LongType())}} > > Works: * {{select custom_func()}} > * {{create temporary view blaah as select custom_func()}} > * {{with step_1 as ( select custom_func() ) select * from step_1}} > Broken: > {{create temporary view blaah as with step_1 as ( select custom_func() ) > select * from step_1}} > > followed by: > {{select * from blaah}} > > Error: > {{Error in SQL statement: AnalysisException: No handler for UDF/UDAF/UDTF > '}}{{com.stuff.path.custom_func}}{{';}} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34836) DataSourceV2Relation with column filter fails with ClassCastException at collectAsList
Markus Riedl-Ehrenleitner created SPARK-34836: - Summary: DataSourceV2Relation with column filter fails with ClassCastException at collectAsList Key: SPARK-34836 URL: https://issues.apache.org/jira/browse/SPARK-34836 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.1 Reporter: Markus Riedl-Ehrenleitner After trying to upgrade to 3.1.1. multiple of our test cases fail with a ClassCastException at *DataFrame.collectAsList()* Produced exception: {noformat} java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to class org.apache.spark.sql.catalyst.expressions.UnsafeRow (org.apache.spark.sql.catalyst.expressions.GenericInternalRow and org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module of loader 'app') at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [spark-core_2.12-3.1.1.jar:3.1.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:834) [?:?] 2021-03-23 13:00:26.974 WARN [org.apache.spark.scheduler.TaskSetManager] Lost task 0.0 in stage 0.0 (TID 0) (192.168.0.6 executor driver): java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to class org.apache.spark.sql.catalyst.expressions.UnsafeRow (org.apache.spark.sql.catalyst.expressions.GenericInternalRow and org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module of loader 'app') at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834){noformat} All test cases follow this pattern: {code:java} StructField aDouble = DataTypes.createStructField("aDouble", DataTypes.DoubleType, true); StructField aDoubleArray = DataTypes.createStructField("aDoubleArray", DataTypes.createArrayType(DataTypes.DoubleType), true); StructType schema = DataTypes.createStructType(Arrays.asList(aDouble, aDoubleArray)); Column aDoubleFilter = functions.column("aDouble").equalTo(functions.lit(1d)); Column aDoubleArrayFilter = functions.aggregate( functions.col("aDoubleArray"), functions.lit(false), (seed, column) -> seed.or(column.equalTo(functions.lit(1d; Column filter = aDoubleFilter.or(aDoubleArrayFilter); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(new HashMap<>()); Dataset dataset = Dataset.ofRows(sparkSession, DataSourceV2Relation.create(new ReadTable(schema), null, null, options)); Dataset filtered = dataset.filter(filter); List collected = filtered.collectAsList(); {code} ReadTable is a *org.apache.spark.sql.connector.catalog.Table*, with the given schema
[jira] [Created] (SPARK-34835) Support TimestampType in UDT
Darcy Shen created SPARK-34835: -- Summary: Support TimestampType in UDT Key: SPARK-34835 URL: https://issues.apache.org/jira/browse/SPARK-34835 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.1.1, 3.0.2 Reporter: Darcy Shen -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34830) Some UDF calls inside transform are broken
[ https://issues.apache.org/jira/browse/SPARK-34830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307055#comment-17307055 ] Pavel Chernikov commented on SPARK-34830: - Good catch! I tend to agree now, that the issues are related. > Some UDF calls inside transform are broken > -- > > Key: SPARK-34830 > URL: https://issues.apache.org/jira/browse/SPARK-34830 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Daniel Solow >Priority: Major > > Let's say I want to create a UDF to do a simple lookup on a string: > {code:java} > import org.apache.spark.sql.{functions => f} > val M = Map("a" -> "abc", "b" -> "defg") > val BM = spark.sparkContext.broadcast(M) > val LOOKUP = f.udf((s: String) => BM.value.get(s)) > {code} > Now if I have the following dataframe: > {code:java} > val df = Seq( > Tuple1(Seq("a", "b")) > ).toDF("arr") > {code} > and I want to run this UDF over each element in the array, I can do: > {code:java} > df.select(f.transform($"arr", i => LOOKUP(i)).as("arr")).show(false) > {code} > This should show: > {code:java} > +---+ > |arr| > +---+ > |[abc, defg]| > +---+ > {code} > However it actually shows: > {code:java} > +---+ > |arr| > +---+ > |[def, defg]| > +---+ > {code} > It's also broken for SQL (even without DSL). This gives the same result: > {code:java} > spark.udf.register("LOOKUP",(s: String) => BM.value.get(s)) > df.selectExpr("TRANSFORM(arr, a -> LOOKUP(a)) AS arr").show(false) > {code} > Note that "def" is not even in the map I'm using. > This is a big problem because it breaks existing code/UDFs. I noticed this > because the job I ported from 2.4.5 to 3.1.1 seemed to be working, but was > actually producing broken data. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-34366) Add metric interfaces to DS v2
[ https://issues.apache.org/jira/browse/SPARK-34366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-34366. - Fix Version/s: 3.2.0 Resolution: Fixed Issue resolved by pull request 31476 [https://github.com/apache/spark/pull/31476] > Add metric interfaces to DS v2 > -- > > Key: SPARK-34366 > URL: https://issues.apache.org/jira/browse/SPARK-34366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Assignee: L. C. Hsieh >Priority: Major > Fix For: 3.2.0 > > > Add a few public API change to DS v2, to make DS v2 scan can report metrics > to Spark. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34829) transform_values return identical values when it's used with udf that returns reference type
[ https://issues.apache.org/jira/browse/SPARK-34829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pavel Chernikov updated SPARK-34829: Summary: transform_values return identical values when it's used with udf that returns reference type (was: transform_values return identical values while operating on complex types) > transform_values return identical values when it's used with udf that returns > reference type > > > Key: SPARK-34829 > URL: https://issues.apache.org/jira/browse/SPARK-34829 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Pavel Chernikov >Priority: Major > > If return value of an {{udf}} that is passed to {{transform_values}} is an > {{AnyRef}}, then the transformation returns identical new values for each map > key (to be more precise, each newly obtained value overrides values for all > previously processed keys). > Consider following examples: > {code:java} > case class Bar(i: Int) > val square = udf((b: Bar) => b.i * b.i) > val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") > df.withColumn("map_square", transform_values(col("map"), (_, v) => > square(v))).show(truncate = false) > +--++ > |map |map_square | > +--++ > |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}| > +--++ > {code} > vs > {code:java} > case class Bar(i: Int) > case class BarSquare(i: Int) > val square = udf((b: Bar) => BarSquare(b.i * b.i)) > val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") > df.withColumn("map_square", transform_values(col("map"), (_, v) => > square(v))).show(truncate = false) > +--+--+ > |map |map_square | > +--+--+ > |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}| > +--+--+ > {code} > or even just this one > {code:java} > case class Foo(s: String) > val reverse = udf((f: Foo) => f.s.reverse) > val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> > Foo("xyz"))).toDF("map") > df.withColumn("map_reverse", transform_values(col("map"), (_, v) => > reverse(v))).show(truncate = false) > ++--+ > |map |map_reverse | > ++--+ > |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}| > ++--+ > {code} > After playing with > {{org.apache.spark.sql.catalyst.expressions.TransformValues}} it looks like > something wrong is happening while executing this line: > {code:java} > resultValues.update(i, functionForEval.eval(inputRow)){code} > To be more precise , it's all about {{functionForEval.eval(inputRow)}} , > because if you do something like this: > {code:java} > println(s"RESULTS PRIOR TO EVALUATION - $resultValues") > val resultValue = functionForEval.eval(inputRow) > println(s"RESULT - $resultValue") > println(s"RESULTS PRIOR TO UPDATE - $resultValues") > resultValues.update(i, resultValue) > println(s"RESULTS AFTER UPDATE - $resultValues"){code} > You'll see in the logs, something like: > {code:java} > RESULTS PRIOR TO EVALUATION - [null,null,null] > RESULT - [0,1] > RESULTS PRIOR TO UPDATE - [null,null,null] > RESULTS AFTER UPDATE - [[0,1],null,null] > -- > RESULTS PRIOR TO EVALUATION - [[0,1],null,null] > RESULT - [0,4] > RESULTS PRIOR TO UPDATE - [[0,4],null,null] > RESULTS AFTER UPDATE - [[0,4],[0,4],null] > -- > RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] > RESULT - [0,9] > RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null] > RESULTS AFTER UPDATE - [[0,9],[0,9],[0,9] > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34829) transform_values return identical values while operating on complex types
[ https://issues.apache.org/jira/browse/SPARK-34829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pavel Chernikov updated SPARK-34829: Description: If return value of an {{udf}} that is passed to {{transform_values}} is an {{AnyRef}}, then the transformation returns identical new values for each map key (to be more precise, each newly obtained value overrides values for all previously processed keys). Consider following examples: {code:java} case class Bar(i: Int) val square = udf((b: Bar) => b.i * b.i) val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false) +--++ |map |map_square | +--++ |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}| +--++ {code} vs {code:java} case class Bar(i: Int) case class BarSquare(i: Int) val square = udf((b: Bar) => BarSquare(b.i * b.i)) val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false) +--+--+ |map |map_square | +--+--+ |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}| +--+--+ {code} or even just this one {code:java} case class Foo(s: String) val reverse = udf((f: Foo) => f.s.reverse) val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> Foo("xyz"))).toDF("map") df.withColumn("map_reverse", transform_values(col("map"), (_, v) => reverse(v))).show(truncate = false) ++--+ |map |map_reverse | ++--+ |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}| ++--+ {code} After playing with {{org.apache.spark.sql.catalyst.expressions.TransformValues}} it looks like something wrong is happening while executing this line: {code:java} resultValues.update(i, functionForEval.eval(inputRow)){code} To be more precise , it's all about {{functionForEval.eval(inputRow)}} , because if you do something like this: {code:java} println(s"RESULTS PRIOR TO EVALUATION - $resultValues") val resultValue = functionForEval.eval(inputRow) println(s"RESULT - $resultValue") println(s"RESULTS PRIOR TO UPDATE - $resultValues") resultValues.update(i, resultValue) println(s"RESULTS AFTER UPDATE - $resultValues"){code} You'll see in the logs, something like: {code:java} RESULTS PRIOR TO EVALUATION - [null,null,null] RESULT - [0,1] RESULTS PRIOR TO UPDATE - [null,null,null] RESULTS AFTER UPDATE - [[0,1],null,null] -- RESULTS PRIOR TO EVALUATION - [[0,1],null,null] RESULT - [0,4] RESULTS PRIOR TO UPDATE - [[0,4],null,null] RESULTS AFTER UPDATE - [[0,4],[0,4],null] -- RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] RESULT - [0,9] RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null] RESULTS AFTER UPDATE - [[0,9],[0,9],[0,9] {code} was: If map values are {{StructType}} s then behavior of {{transform_values}} is inconsistent (it may return identical values). To be more precise, it looks like it returns identical values if the return type is {{AnyRef}}. Consider following examples: {code:java} case class Bar(i: Int) val square = udf((b: Bar) => b.i * b.i) val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false) +--++ |map |map_square | +--++ |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}| +--++ {code} vs {code:java} case class Bar(i: Int) case class BarSquare(i: Int) val square = udf((b: Bar) => BarSquare(b.i * b.i)) val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false) +--+--+ |map |map_square | +--+--+ |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}| +--+--+ {code} or even just this one {code:java} case class Foo(s: String) val reverse = udf((f: Foo) =>
[jira] [Commented] (SPARK-34083) Using TPCDS original definitions for char/varchar columns
[ https://issues.apache.org/jira/browse/SPARK-34083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307024#comment-17307024 ] Apache Spark commented on SPARK-34083: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/31943 > Using TPCDS original definitions for char/varchar columns > - > > Key: SPARK-34083 > URL: https://issues.apache.org/jira/browse/SPARK-34083 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0, 3.2.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.1.1 > > > Using TPCDS original definitions for char/varchar columns instead of the > modified string -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34833) Apply right-padding correctly for correlated subqueries
[ https://issues.apache.org/jira/browse/SPARK-34833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307000#comment-17307000 ] Takeshi Yamamuro commented on SPARK-34833: -- Ah, thanks, Dongjoon. I forgot to check the previous versions. > Apply right-padding correctly for correlated subqueries > --- > > Key: SPARK-34833 > URL: https://issues.apache.org/jira/browse/SPARK-34833 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.0, 3.2.0, 3.1.1 >Reporter: Takeshi Yamamuro >Priority: Blocker > Labels: correctness > > This ticket aim at fixing the bug that does not apply right-padding for char > types inside correlated subquries. > For example, a query below returns nothing in master, but a correct result > is `c`. > {code} > scala> sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING parquet") > scala> sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(7)) USING parquet") > scala> sql("INSERT INTO t1 VALUES ('c', 'b')") > scala> sql("INSERT INTO t2 VALUES ('a', 'b')") > scala> val df = sql(""" > |SELECT v FROM t1 > |WHERE 'a' IN (SELECT v FROM t2 WHERE t2.c = t1.c )""".stripMargin) > scala> df.show() > +---+ > | v| > +---+ > +---+ > {code} > This is because `ApplyCharTypePadding` does not handle the case above to > apply right-padding into `'abc'`. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34834) There is a potential Netty memory leak in TransportResponseHandler.
[ https://issues.apache.org/jira/browse/SPARK-34834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34834: Assignee: Apache Spark > There is a potential Netty memory leak in TransportResponseHandler. > --- > > Key: SPARK-34834 > URL: https://issues.apache.org/jira/browse/SPARK-34834 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.7, 3.0.2, 3.1.0, 3.1.1 >Reporter: weixiuli >Assignee: Apache Spark >Priority: Major > > There is a potential Netty memory leak in TransportResponseHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-34834) There is a potential Netty memory leak in TransportResponseHandler.
[ https://issues.apache.org/jira/browse/SPARK-34834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-34834: Assignee: (was: Apache Spark) > There is a potential Netty memory leak in TransportResponseHandler. > --- > > Key: SPARK-34834 > URL: https://issues.apache.org/jira/browse/SPARK-34834 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.7, 3.0.2, 3.1.0, 3.1.1 >Reporter: weixiuli >Priority: Major > > There is a potential Netty memory leak in TransportResponseHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34834) There is a potential Netty memory leak in TransportResponseHandler.
[ https://issues.apache.org/jira/browse/SPARK-34834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17306989#comment-17306989 ] Apache Spark commented on SPARK-34834: -- User 'weixiuli' has created a pull request for this issue: https://github.com/apache/spark/pull/31942 > There is a potential Netty memory leak in TransportResponseHandler. > --- > > Key: SPARK-34834 > URL: https://issues.apache.org/jira/browse/SPARK-34834 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.4.7, 3.0.2, 3.1.0, 3.1.1 >Reporter: weixiuli >Priority: Major > > There is a potential Netty memory leak in TransportResponseHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org