[jira] [Comment Edited] (SPARK-29748) Remove sorting of fields in PySpark SQL Row creation
[ https://issues.apache.org/jira/browse/SPARK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973695#comment-16973695 ] Maciej Szymkiewicz edited comment on SPARK-29748 at 11/13/19 9:13 PM: -- While this is a step in the right direction I think it justifies a broader discussion about {{Row}} purpose, API, and behavior guarantees. Especially if we're going to introduce diverging implementations, with {{Row}} and {{LegacyRow}}. Over the years Spark code have accumulated a lot of conflicting behaviors and special cases related to {{Row}}: * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not. * Sometimes there are treated as ordered products ({{tuples}}), sometimes as unordered dictionaries. * We provide efficient access only by position, but the primary access method is by name. * etc. Some of the unusual properties, are well documented (but still confusing), other are not. For example objects that are indistinguishable using public API {code:python} from pyspark.sql.types import StructType, StructField, IntegerType, StringType a = Row(x1=1, x2="foo") b = Row("x1", "x2")(1, "foo") a == b # True type(a) == type(b) #True list(a.__fields__) == list(b.__fields__) # Not really public, but just to make a point {code} cannot be substituted in practice. {code:python} schema = StructType([ StructField("x2", StringType()), StructField("x1", IntegerType())]) spark.createDataFrame([a], schema) # DataFrame[x2: string, x1: int] spark.createDataFrame([b], schema) # TypeError Traceback (most recent call last) # ... # TypeError: field x1: IntegerType can not accept object 'foo' in type {code} To make things even worse the primary (while I don't have hard data here, but it is common both in the internal API as well as the code I've seen in the wild) access method - by name - is _O(M)_ where is the width of schema. So if we're going to modify the core behavior (sorting) it makes sense to rethink the whole design. Since the schema is carried around with each object and pass over the wire we might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something around these lines: {code:python} import sys from collections import OrderedDict class Row: slots = ["_store"] def __init__(self, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") if args: self._store = OrderedDict.fromkeys(args) else: self._store = OrderedDict(kwargs) def __getattr__(self, x): return self._store[x] def __getitem__(self, x): if isinstance(x, int): return list(self._store.values())[x] else: return self._store[x] def __iter__(self): return iter(self._store.values()) def __repr__(self): return "Row({})".format(", ".join( "{}={}".format(k, v) for k, v in self._store.items() )) def __len__(self): return len(self._store) def __call__(self, *args): if len(args) > len(self): raise ValueError("Can not create Row with fields %s, expected %d values " "but got %s" % (self, len(self), args)) self._store.update(zip(self._store.keys(), args)) return self def __eq__(self, other): return isinstance(other, Row) and self._store == other._store @property def _fields(self): return self._store.keys() @staticmethod def _conv(obj): if isinstance(obj, Row): return obj.asDict(True) elif isinstance(obj, list): return [conv(o) for o in obj] elif isinstance(obj, dict): return dict((k, conv(v)) for k, v in obj.items()) else: return obj def asDict(self, recursive=False): if recursive: result = OrderedDict.fromkeys(self._fields) for key in self._fields: result[key] = Row._conv(self._store[key]) return result else: return self._store @classmethod def from_dict(cls, d): if sys.version_info >= (3, 6): if not(isinstance(d, dict)): raise ValueError( "from_dict requires dict but got {}".format( type(d))) else: if not(isinstance(d, OrderedDict)): raise ValueError( "from_dict requires collections.OrderedDict {}".format( type(d))) return cls(**d) {code} If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index
[jira] [Comment Edited] (SPARK-29748) Remove sorting of fields in PySpark SQL Row creation
[ https://issues.apache.org/jira/browse/SPARK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973695#comment-16973695 ] Maciej Szymkiewicz edited comment on SPARK-29748 at 11/13/19 9:11 PM: -- While this is a step in the right direction I think it justifies a broader discussion about {{Row}} purpose, API, and behavior guarantees. Especially if we're going to introduce diverging implementations, with {{Row}} and {{LegacyRow}}. Over the years Spark code have accumulated a lot of conflicting behaviors and special cases related to {{Row}}: * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not. * Sometimes there are treated as ordered products ({{tuples}}), sometimes as unordered dictionaries. * We provide efficient access only by position, but the primary access method is by name. * etc. Some of the unusual properties, are well documented (but still confusing), other are not. For example objects that are indistinguishable using public API {code:python} from pyspark.sql.types import StructType, StructField, IntegerType, StringType a = Row(x1=1, x2="foo") b = Row("x1", "x2")(1, "foo") a == b # True type(a) == type(b) #True list(a.__fields__) == list(b.__fields__) # Not really public, but just to make a point {code} cannot be substituted in practice. {code:python} schema = StructType([ StructField("x2", StringType()), StructField("x1", IntegerType())]) spark.createDataFrame([a], schema) # DataFrame[x2: string, x1: int] spark.createDataFrame([b], schema) # TypeError Traceback (most recent call last) # ... # TypeError: field x1: IntegerType can not accept object 'foo' in type {code} To make things even worse the primary (while I don't have hard data here, but it is common both in the internal API as well as the code I've seen in the wild) access method - by name - is _O(M)_ where is the width of schema. So if we're going to modify the core behavior (sorting) it makes sense to rethink the whole design. Since the schema is carried around with each object and pass over the wire we might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something around these lines: {code:python} import sys from collections import OrderedDict class Row: slots = ["_store"] def __init__(self, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") if args: self._store = OrderedDict.fromkeys(args) else: self._store = OrderedDict(kwargs) def __getattr__(self, x): return self._store[x] def __getitem__(self, x): if isinstance(x, int): return list(self._store.values())[x] else: return self._store[x] def __iter__(self): return iter(self._store.values()) def __repr__(self): return "Row({})".format(", ".join( "{}={}".format(k, v) for k, v in self._store.items() )) def __len__(self): return len(self._store) def __call__(self, *args): if len(args) > len(self): raise ValueError("Can not create Row with fields %s, expected %d values " "but got %s" % (self, len(self), args)) self._store.update(zip(self._store.keys(), args)) return self def __eq__(self, other): return isinstance(other, Row) and self._store == other._store @property def _fields(self): return self._store.keys() @staticmethod def _conv(obj): if isinstance(obj, Row): return obj.asDict(True) elif isinstance(obj, list): return [conv(o) for o in obj] elif isinstance(obj, dict): return dict((k, conv(v)) for k, v in obj.items()) else: return obj def asDict(self, recursive=False): if recursive: result = OrderedDict.fromkeys(self._fields) for key in self._fields: result[key] = Row._conv(self._store[key]) return result else: return self._store @classmethod def from_dict(cls, d): if sys.version_info >= (3, 6): if not(isinstance(d, dict)): raise ValueError( "from_dict requires dict but got {}".format( type(d))) else: if not(isinstance(d, OrderedDict)): raise ValueError( "from_dict requires collections.OrderedDict {}".format( type(d))) return cls(**d) {code} If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index