[ 
https://issues.apache.org/jira/browse/SPARK-42910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704575#comment-17704575
 ] 

Maciej Szymkiewicz edited comment on SPARK-42910 at 3/24/23 11:15 AM:
----------------------------------------------------------------------

It is no longer generic, so that cannot be a problem. 

Additionally, the issue seems to disappear when classes are defined externally:

{code:python}
# foo,py

from abc import ABC
from typing import Generic, TypeVar, Callable

T = TypeVar("T")

class Foo:
    ...

class A(ABC, Generic[T]):
    base_record: Callable[..., T]

class B(A):
    base_record = Foo

class C(B):
    ...

def f(_: int) -> int:
    print(C.base_record)
    return 1
{code}

and then

{code:python}
from operator import add
from foo import C, f
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("schema_test")\
    .getOrCreate()

spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) 
{code}

so it makes sense to focus further investigation on the way how we prepare 
locally defined classes for shipping over the wire.


was (Author: zero323):
It is no longer generic, so that cannot be a problem. 

Additionally, the issue seems to disappear when classes are defined externally:

{code:python}
# foo,py

from abc import ABC
from typing import Generic, TypeVar, Callable

T = TypeVar("T")

class Foo:
    ...

class A(ABC, Generic[T]):
    base_record: Callable[..., T]

class B(A):
    base_record = Foo

class C(B):
    ...

def f(_: int) -> int:
    print(C.base_record)
    return 1
{code}

and then

{code: python}
from operator import add
from foo import C, f
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("schema_test")\
    .getOrCreate()

spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) 
{code}

so it makes sense to focus further investigation on the way how we prepare 
locally defined classes for shipping over the wire.

> Generic annotation of class attribute in abstract class is NOT initalized in 
> inherited classes
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-42910
>                 URL: https://issues.apache.org/jira/browse/SPARK-42910
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.0, 3.3.2
>         Environment: Tested in two environments:
>  # Databricks
> Pyspark Version: 3.3.0
> Python Version: 3.9.15
>  # Local
> Pyspark Verison: 3.3.2
> Python Version: 3.3.10
>            Reporter: Jon Farzanfar
>            Priority: Minor
>
> We are trying to leverage generics to better type our code base.  The example 
> below shows the problem we are having, however without generics this works 
> completely fine in pyspark however with generics it doesn't but does locally 
> without leveraging pyspark.  
> Output for local: 
>  
> {code:java}
> <class '__main__.Foo'>{code}
>  
> TraceBack for pyspark: 
> {code:java}
> AttributeError: type object 'C' has no attribute 'base_record'
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:136)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more {code}
>  
> Code:
>  
> {code:java}
> from abc import ABC
> from typing import Generic, TypeVar, Callable
> from operator import add
> from pyspark.sql import SparkSession
> T = TypeVar("T")
> class Foo:
>     ...
> class A(ABC, Generic[T]):
>     base_record: Callable[..., T]
> class B(A):
>     base_record = Foo
> class C(B):
>     ...
> def f(_: int) -> int:
>     print(C.base_record)
>     return 1
> spark = SparkSession\
>     .builder\
>     .appName("schema_test")\
>     .getOrCreate()
> spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to