[ 
https://issues.apache.org/jira/browse/SPARK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973695#comment-16973695
 ] 

Maciej Szymkiewicz edited comment on SPARK-29748 at 11/13/19 9:11 PM:
----------------------------------------------------------------------

While this is a step in the right direction I think it justifies a broader 
discussion about {{Row}} purpose, API, and behavior guarantees. Especially if 
we're going to introduce diverging implementations, with {{Row}} and 
{{LegacyRow}}.

Over the years Spark code have accumulated a lot of conflicting behaviors and 
special cases related to {{Row}}:
 * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not.
 * Sometimes there are treated as ordered products ({{tuples}}), sometimes as 
unordered dictionaries.
 * We provide efficient access only by position, but the primary access method 
is by name.
 * etc.

Some of the unusual properties, are well documented (but still confusing), 
other are not. For example objects that are indistinguishable using public API

 
{code:python}
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

a = Row(x1=1, x2="foo")
b = Row("x1", "x2")(1, "foo")

a == b
# True 
type(a) == type(b)
#True

list(a.__fields__) == list(b.__fields__)  # Not really public, but just to make 
a point
{code}
cannot be substituted in practice.
{code:python}
schema = StructType([
    StructField("x2", StringType()),
    StructField("x1", IntegerType())])

spark.createDataFrame([a], schema)                                              
                                                                                
                
# DataFrame[x2: string, x1: int]

spark.createDataFrame([b], schema) 
# TypeError                                 Traceback (most recent call last)
# ...
# TypeError: field x1: IntegerType can not accept object 'foo' in type <class 
'str'>
{code}
To make things even worse the primary (while I don't have hard data here, but 
it is common both in the internal API as well as the code I've seen in the 
wild) access method - by name - is _O(M)_  where is the width of schema.

So if we're going to modify the core behavior (sorting) it makes sense to 
rethink the whole design.

Since the schema is carried around with each object and pass over the wire we 
might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something 
around these lines:
{code:python}
import sys
from collections import OrderedDict

class Row:
    slots = ["_store"]

    def __init__(self, *args, **kwargs):
        if args and kwargs:
            raise ValueError("Can not use both args "
                             "and kwargs to create Row")
        if args:
            self._store = OrderedDict.fromkeys(args)
        else:
            self._store = OrderedDict(kwargs)

    def __getattr__(self, x):
        return self._store[x]

    def __getitem__(self, x):
        if isinstance(x, int):
            return list(self._store.values())[x]
        else:
            return self._store[x]

    def __iter__(self):
        return iter(self._store.values())

    def __repr__(self):
        return "Row({})".format(", ".join(
            "{}={}".format(k, v) for k, v in self._store.items()
        ))

    def __len__(self):
        return len(self._store)

    def __call__(self, *args):
        if len(args) > len(self):
            raise ValueError("Can not create Row with fields %s, expected %d 
values "
                             "but got %s" % (self, len(self), args))

        self._store.update(zip(self._store.keys(), args))
        return self

    def __eq__(self, other):
        return isinstance(other, Row) and self._store == other._store

    @property
    def _fields(self):
        return self._store.keys()

    @staticmethod
    def _conv(obj):
        if isinstance(obj, Row):
            return obj.asDict(True)
        elif isinstance(obj, list):
            return [conv(o) for o in obj]
        elif isinstance(obj, dict):
            return dict((k, conv(v)) for k, v in obj.items())
        else:
            return obj

    def asDict(self, recursive=False):
        if recursive:
            result = OrderedDict.fromkeys(self._fields)
            for key in self._fields:
                result[key] = Row._conv(self._store[key])
            return result
        else:
            return self._store

    @classmethod
    def  from_dict(cls, d):
        if sys.version_info >= (3, 6):
            if not(isinstance(d, dict)):
                raise ValueError(
                    "from_dict requires dict but got {}".format(
                        type(d)))

        else:
            if not(isinstance(d, OrderedDict)):
                raise ValueError(
                    "from_dict requires collections.OrderedDict {}".format(
                        type(d)))
        return cls(**d)
{code}
If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index access) 
we could actually try to hack {{namedtuple}}:

{code:python}
from collections import namedtuple
import hashlib
import json

class Row:
    def __new__(cls, *args, **kwargs):
        if args and kwargs:
            raise ValueError("Can not use both args "
                             "and kwargs to create Row")
        if args:
            return _SchemaRegistry.schema(tuple(args))
        else:
            return _SchemaRegistry.make(tuple(kwargs.keys()), kwargs.values())

class _SchemaRegistry:
    registry = {}
    @classmethod
    def schema(cls, fields):
        if fields in cls.registry:
            return cls.registry[fields]
        else:
            m = hashlib.md5()
            m.update(json.dumps(fields).encode())
            suffix = m.hexdigest()

            reducer = lambda self: (cls.make, (self._fields, tuple(self)))
            # TODO Add recursive case
            def asDict = lambda self: self._asdict()

            schema = type(
                "Row",
                (namedtuple("Row_{}".format(suffix), fields), Row),
                {"__reduce__": reducer, "asDict": asDict})
            cls.registry[fields] = schema  # Idempotent so we don't need lock
            return schema

    @classmethod
    def make(cls, fields, values):
        return cls.schema(fields)(*values)
{code}


was (Author: zero323):
While this is a step in the right direction I think it justifies a broader 
discussion about {{Row}} purpose, API, and behavior guarantees. Especially if 
we're going to introduce diverging implementations, with {{Row}} and 
{{LegacyRow}}.

Over the years Spark code have accumulated a lot of conflicting behaviors and 
special cases related to {{Row}}:
 * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not.
 * Sometimes there are treated as ordered products ({{tuples}}), sometimes as 
unordered dictionaries.
 * We provide efficient access only by position, but the primary access method 
is by name.
 * etc.

Some of the unusual properties, are well documented (but still confusing), 
other are not. For example objects that are indistinguishable using public API

 
{code:python}
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

a = Row(x1=1, x2="foo")
b = Row("x1", "x2")(1, "foo")

a == b
# True 
type(a) == type(b)
#True

list(a.__fields__) == list(b.__fields__)  # Not really public, but just to make 
a point
{code}
cannot be substituted in practice.
{code:python}
schema = StructType([
    StructField("x2", StringType()),
    StructField("x1", IntegerType())])

spark.createDataFrame([a], schema)                                              
                                                                                
                
# DataFrame[x2: string, x1: int]

spark.createDataFrame([b], schema) 
# TypeError                                 Traceback (most recent call last)
# ...
# TypeError: field x1: IntegerType can not accept object 'foo' in type <class 
'str'>
{code}
To make things even worse the primary (while I don't have hard data here, but 
it is common both in the internal API as well as the code I've seen in the 
wild) access method - by name - is _O(M)_  where is the width of schema.

So if we're going to modify the core behavior (sorting) it makes sense to 
rethink the whole design.

Since the schema is carried around with each object and pass over the wire we 
might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something 
around these lines:
{code:python}
import sys
from collections import OrderedDict

class Row:
    slots = ["_store"]

    def __init__(self, *args, **kwargs):
        if args and kwargs:
            raise ValueError("Can not use both args "
                             "and kwargs to create Row")
        if args:
            self._store = OrderedDict.fromkeys(args)
        else:
            self._store = OrderedDict(kwargs)

    def __getattr__(self, x):
        return self._store[x]

    def __getitem__(self, x):
        if isinstance(x, int):
            return list(self._store.values())[x]
        else:
            return self._store[x]

    def __iter__(self):
        return iter(self._store.values())

    def __repr__(self):
        return "Row({})".format(", ".join(
            "{}={}".format(k, v) for k, v in self._store.items()
        ))

    def __len__(self):
        return len(self._store)

    def __call__(self, *args):
        if len(args) > len(self):
            raise ValueError("Can not create Row with fields %s, expected %d 
values "
                             "but got %s" % (self, len(self), args))

        self._store.update(zip(self._store.keys(), args))
        return self

    def __eq__(self, other):
        return isinstance(other, Row) and self._store == other._store

    @property
    def _fields(self):
        return self._store.keys()

    @staticmethod
    def _conv(obj):
        if isinstance(obj, Row):
            return obj.asDict(True)
        elif isinstance(obj, list):
            return [conv(o) for o in obj]
        elif isinstance(obj, dict):
            return dict((k, conv(v)) for k, v in obj.items())
        else:
            return obj

    def asDict(self, recursive=False):
        if recursive:
            result = OrderedDict.fromkeys(self._fields)
            for key in self._fields:
                result[key] = Row._conv(self._store[key])
            return result
        else:
            return self._store

    @classmethod
    def  from_dict(cls, d):
        if sys.version_info >= (3, 6):
            if not(isinstance(d, dict)):
                raise ValueError(
                    "from_dict requires dict but got {}".format(
                        type(d)))

        else:
            if not(isinstance(d, OrderedDict)):
                raise ValueError(
                    "from_dict requires collections.OrderedDict {}".format(
                        type(d)))
        return cls(**d)
{code}
If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index access) 
we could actually try to hack {{namedtuple}}:
{code:java}
from collections import namedtuple
import hashlib
import json

class Row:
    def __new__(cls, *args, **kwargs):
        if args and kwargs:
            raise ValueError("Can not use both args "
                             "and kwargs to create Row")
        if args:
            return _SchemaRegistry.schema(tuple(args))
        else:
            return _SchemaRegistry.make(tuple(kwargs.keys()), kwargs.values())

class _SchemaRegistry:
    registry = {}
    @classmethod
    def schema(cls, fields):
        if fields in cls.registry:
            return cls.registry[fields]
        else:
            m = hashlib.md5()
            m.update(json.dumps(fields).encode())
            suffix = m.hexdigest()

            reducer = lambda self: (cls.make, (self._fields, tuple(self)))
            # TODO Add recursive case
            def asDict = lambda self: self._asdict()

            schema = type(
                "Row",
                (namedtuple("Row_{}".format(suffix), fields), Row),
                {"__reduce__": reducer, "asDict": asDict})
            cls.registry[fields] = schema  # Idempotent so we don't need lock
            return schema

    @classmethod
    def make(cls, fields, values):
        return cls.schema(fields)(*values)
{code}

> Remove sorting of fields in PySpark SQL Row creation
> ----------------------------------------------------
>
>                 Key: SPARK-29748
>                 URL: https://issues.apache.org/jira/browse/SPARK-29748
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.0.0
>            Reporter: Bryan Cutler
>            Priority: Major
>
> Currently, when a PySpark Row is created with keyword arguments, the fields 
> are sorted alphabetically. This has created a lot of confusion with users 
> because it is not obvious (although it is stated in the pydocs) that they 
> will be sorted alphabetically, and then an error can occur later when 
> applying a schema and the field order does not match.
> The original reason for sorting fields is because kwargs in python < 3.6 are 
> not guaranteed to be in the same order that they were entered. Sorting 
> alphabetically would ensure a consistent order.  Matters are further 
> complicated with the flag {{__from_dict__}} that allows the {{Row}} fields to 
> to be referenced by name when made by kwargs, but this flag is not serialized 
> with the Row and leads to inconsistent behavior.
> This JIRA proposes that any sorting of the Fields is removed. Users with 
> Python 3.6+ creating Rows with kwargs can continue to do so since Python will 
> ensure the order is the same as entered. Users with Python < 3.6 will have to 
> create Rows with an OrderedDict or by using the Row class as a factory 
> (explained in the pydoc).  If kwargs are used, an error will be raised or 
> based on a conf setting it can fall back to a LegacyRow that will sort the 
> fields as before. This LegacyRow will be immediately deprecated and removed 
> once support for Python < 3.6 is dropped.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to