[ https://issues.apache.org/jira/browse/SPARK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973695#comment-16973695 ]
Maciej Szymkiewicz edited comment on SPARK-29748 at 11/13/19 9:11 PM: ---------------------------------------------------------------------- While this is a step in the right direction I think it justifies a broader discussion about {{Row}} purpose, API, and behavior guarantees. Especially if we're going to introduce diverging implementations, with {{Row}} and {{LegacyRow}}. Over the years Spark code have accumulated a lot of conflicting behaviors and special cases related to {{Row}}: * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not. * Sometimes there are treated as ordered products ({{tuples}}), sometimes as unordered dictionaries. * We provide efficient access only by position, but the primary access method is by name. * etc. Some of the unusual properties, are well documented (but still confusing), other are not. For example objects that are indistinguishable using public API {code:python} from pyspark.sql.types import StructType, StructField, IntegerType, StringType a = Row(x1=1, x2="foo") b = Row("x1", "x2")(1, "foo") a == b # True type(a) == type(b) #True list(a.__fields__) == list(b.__fields__) # Not really public, but just to make a point {code} cannot be substituted in practice. {code:python} schema = StructType([ StructField("x2", StringType()), StructField("x1", IntegerType())]) spark.createDataFrame([a], schema) # DataFrame[x2: string, x1: int] spark.createDataFrame([b], schema) # TypeError Traceback (most recent call last) # ... # TypeError: field x1: IntegerType can not accept object 'foo' in type <class 'str'> {code} To make things even worse the primary (while I don't have hard data here, but it is common both in the internal API as well as the code I've seen in the wild) access method - by name - is _O(M)_ where is the width of schema. So if we're going to modify the core behavior (sorting) it makes sense to rethink the whole design. Since the schema is carried around with each object and pass over the wire we might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something around these lines: {code:python} import sys from collections import OrderedDict class Row: slots = ["_store"] def __init__(self, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") if args: self._store = OrderedDict.fromkeys(args) else: self._store = OrderedDict(kwargs) def __getattr__(self, x): return self._store[x] def __getitem__(self, x): if isinstance(x, int): return list(self._store.values())[x] else: return self._store[x] def __iter__(self): return iter(self._store.values()) def __repr__(self): return "Row({})".format(", ".join( "{}={}".format(k, v) for k, v in self._store.items() )) def __len__(self): return len(self._store) def __call__(self, *args): if len(args) > len(self): raise ValueError("Can not create Row with fields %s, expected %d values " "but got %s" % (self, len(self), args)) self._store.update(zip(self._store.keys(), args)) return self def __eq__(self, other): return isinstance(other, Row) and self._store == other._store @property def _fields(self): return self._store.keys() @staticmethod def _conv(obj): if isinstance(obj, Row): return obj.asDict(True) elif isinstance(obj, list): return [conv(o) for o in obj] elif isinstance(obj, dict): return dict((k, conv(v)) for k, v in obj.items()) else: return obj def asDict(self, recursive=False): if recursive: result = OrderedDict.fromkeys(self._fields) for key in self._fields: result[key] = Row._conv(self._store[key]) return result else: return self._store @classmethod def from_dict(cls, d): if sys.version_info >= (3, 6): if not(isinstance(d, dict)): raise ValueError( "from_dict requires dict but got {}".format( type(d))) else: if not(isinstance(d, OrderedDict)): raise ValueError( "from_dict requires collections.OrderedDict {}".format( type(d))) return cls(**d) {code} If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index access) we could actually try to hack {{namedtuple}}: {code:python} from collections import namedtuple import hashlib import json class Row: def __new__(cls, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") if args: return _SchemaRegistry.schema(tuple(args)) else: return _SchemaRegistry.make(tuple(kwargs.keys()), kwargs.values()) class _SchemaRegistry: registry = {} @classmethod def schema(cls, fields): if fields in cls.registry: return cls.registry[fields] else: m = hashlib.md5() m.update(json.dumps(fields).encode()) suffix = m.hexdigest() reducer = lambda self: (cls.make, (self._fields, tuple(self))) # TODO Add recursive case def asDict = lambda self: self._asdict() schema = type( "Row", (namedtuple("Row_{}".format(suffix), fields), Row), {"__reduce__": reducer, "asDict": asDict}) cls.registry[fields] = schema # Idempotent so we don't need lock return schema @classmethod def make(cls, fields, values): return cls.schema(fields)(*values) {code} was (Author: zero323): While this is a step in the right direction I think it justifies a broader discussion about {{Row}} purpose, API, and behavior guarantees. Especially if we're going to introduce diverging implementations, with {{Row}} and {{LegacyRow}}. Over the years Spark code have accumulated a lot of conflicting behaviors and special cases related to {{Row}}: * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not. * Sometimes there are treated as ordered products ({{tuples}}), sometimes as unordered dictionaries. * We provide efficient access only by position, but the primary access method is by name. * etc. Some of the unusual properties, are well documented (but still confusing), other are not. For example objects that are indistinguishable using public API {code:python} from pyspark.sql.types import StructType, StructField, IntegerType, StringType a = Row(x1=1, x2="foo") b = Row("x1", "x2")(1, "foo") a == b # True type(a) == type(b) #True list(a.__fields__) == list(b.__fields__) # Not really public, but just to make a point {code} cannot be substituted in practice. {code:python} schema = StructType([ StructField("x2", StringType()), StructField("x1", IntegerType())]) spark.createDataFrame([a], schema) # DataFrame[x2: string, x1: int] spark.createDataFrame([b], schema) # TypeError Traceback (most recent call last) # ... # TypeError: field x1: IntegerType can not accept object 'foo' in type <class 'str'> {code} To make things even worse the primary (while I don't have hard data here, but it is common both in the internal API as well as the code I've seen in the wild) access method - by name - is _O(M)_ where is the width of schema. So if we're going to modify the core behavior (sorting) it makes sense to rethink the whole design. Since the schema is carried around with each object and pass over the wire we might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something around these lines: {code:python} import sys from collections import OrderedDict class Row: slots = ["_store"] def __init__(self, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") if args: self._store = OrderedDict.fromkeys(args) else: self._store = OrderedDict(kwargs) def __getattr__(self, x): return self._store[x] def __getitem__(self, x): if isinstance(x, int): return list(self._store.values())[x] else: return self._store[x] def __iter__(self): return iter(self._store.values()) def __repr__(self): return "Row({})".format(", ".join( "{}={}".format(k, v) for k, v in self._store.items() )) def __len__(self): return len(self._store) def __call__(self, *args): if len(args) > len(self): raise ValueError("Can not create Row with fields %s, expected %d values " "but got %s" % (self, len(self), args)) self._store.update(zip(self._store.keys(), args)) return self def __eq__(self, other): return isinstance(other, Row) and self._store == other._store @property def _fields(self): return self._store.keys() @staticmethod def _conv(obj): if isinstance(obj, Row): return obj.asDict(True) elif isinstance(obj, list): return [conv(o) for o in obj] elif isinstance(obj, dict): return dict((k, conv(v)) for k, v in obj.items()) else: return obj def asDict(self, recursive=False): if recursive: result = OrderedDict.fromkeys(self._fields) for key in self._fields: result[key] = Row._conv(self._store[key]) return result else: return self._store @classmethod def from_dict(cls, d): if sys.version_info >= (3, 6): if not(isinstance(d, dict)): raise ValueError( "from_dict requires dict but got {}".format( type(d))) else: if not(isinstance(d, OrderedDict)): raise ValueError( "from_dict requires collections.OrderedDict {}".format( type(d))) return cls(**d) {code} If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index access) we could actually try to hack {{namedtuple}}: {code:java} from collections import namedtuple import hashlib import json class Row: def __new__(cls, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") if args: return _SchemaRegistry.schema(tuple(args)) else: return _SchemaRegistry.make(tuple(kwargs.keys()), kwargs.values()) class _SchemaRegistry: registry = {} @classmethod def schema(cls, fields): if fields in cls.registry: return cls.registry[fields] else: m = hashlib.md5() m.update(json.dumps(fields).encode()) suffix = m.hexdigest() reducer = lambda self: (cls.make, (self._fields, tuple(self))) # TODO Add recursive case def asDict = lambda self: self._asdict() schema = type( "Row", (namedtuple("Row_{}".format(suffix), fields), Row), {"__reduce__": reducer, "asDict": asDict}) cls.registry[fields] = schema # Idempotent so we don't need lock return schema @classmethod def make(cls, fields, values): return cls.schema(fields)(*values) {code} > Remove sorting of fields in PySpark SQL Row creation > ---------------------------------------------------- > > Key: SPARK-29748 > URL: https://issues.apache.org/jira/browse/SPARK-29748 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 3.0.0 > Reporter: Bryan Cutler > Priority: Major > > Currently, when a PySpark Row is created with keyword arguments, the fields > are sorted alphabetically. This has created a lot of confusion with users > because it is not obvious (although it is stated in the pydocs) that they > will be sorted alphabetically, and then an error can occur later when > applying a schema and the field order does not match. > The original reason for sorting fields is because kwargs in python < 3.6 are > not guaranteed to be in the same order that they were entered. Sorting > alphabetically would ensure a consistent order. Matters are further > complicated with the flag {{__from_dict__}} that allows the {{Row}} fields to > to be referenced by name when made by kwargs, but this flag is not serialized > with the Row and leads to inconsistent behavior. > This JIRA proposes that any sorting of the Fields is removed. Users with > Python 3.6+ creating Rows with kwargs can continue to do so since Python will > ensure the order is the same as entered. Users with Python < 3.6 will have to > create Rows with an OrderedDict or by using the Row class as a factory > (explained in the pydoc). If kwargs are used, an error will be raised or > based on a conf setting it can fall back to a LegacyRow that will sort the > fields as before. This LegacyRow will be immediately deprecated and removed > once support for Python < 3.6 is dropped. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org