[ 
https://issues.apache.org/jira/browse/SPARK-42910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704572#comment-17704572
 ] 

Maciej Szymkiewicz commented on SPARK-42910:
--------------------------------------------

Thanks [~gurwls223]  Only glanced over this, but an obvious observation is that 
type hierarchy is messed up on the worker {{C.mro()}} is (to the module)

{code:python}
[__main__.C, __main__.B, __main__.A, abc.ABC, typing.Generic, object
{code}

at the point of definition / import,  and

{code:python}
[<class 'abc.C'>, <class 'abc.ABC'>, <class 'typing.Generic'>, <class 'object'>]
{code}

on the worker.  This seems to be consistent across {{serializers}}, as far as I 
can tell.

It seems to me, that {{B}} should be properly initialized as {{A[Foo]}}, i.e.

{code:python}
class B(A[Foo]):
    base_record = Foo
{code}

This also adjusts worker-side {{mro}} to 

{code:python}
[<class 'abc.C'>, <class '__main__.A'>, <class 'abc.ABC'>, <class 
'typing.Generic'>, <class 'object'>]
{code}

but I don't see the problem with {{C}} definition. 


> Generic annotation of class attribute in abstract class is NOT initalized in 
> inherited classes
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-42910
>                 URL: https://issues.apache.org/jira/browse/SPARK-42910
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.0, 3.3.2
>         Environment: Tested in two environments:
>  # Databricks
> Pyspark Version: 3.3.0
> Python Version: 3.9.15
>  # Local
> Pyspark Verison: 3.3.2
> Python Version: 3.3.10
>            Reporter: Jon Farzanfar
>            Priority: Minor
>
> We are trying to leverage generics to better type our code base.  The example 
> below shows the problem we are having, however without generics this works 
> completely fine in pyspark however with generics it doesn't but does locally 
> without leveraging pyspark.  
> Output for local: 
>  
> {code:java}
> <class '__main__.Foo'>{code}
>  
> TraceBack for pyspark: 
> {code:java}
> AttributeError: type object 'C' has no attribute 'base_record'
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:136)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more {code}
>  
> Code:
>  
> {code:java}
> from abc import ABC
> from typing import Generic, TypeVar, Callable
> from operator import add
> from pyspark.sql import SparkSession
> T = TypeVar("T")
> class Foo:
>     ...
> class A(ABC, Generic[T]):
>     base_record: Callable[..., T]
> class B(A):
>     base_record = Foo
> class C(B):
>     ...
> def f(_: int) -> int:
>     print(C.base_record)
>     return 1
> spark = SparkSession\
>     .builder\
>     .appName("schema_test")\
>     .getOrCreate()
> spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to