[jira] [Comment Edited] (SPARK-29748) Remove sorting of fields in PySpark SQL Row creation

2019-11-13 Thread Maciej Szymkiewicz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973695#comment-16973695
 ] 

Maciej Szymkiewicz edited comment on SPARK-29748 at 11/13/19 9:13 PM:
--

While this is a step in the right direction I think it justifies a broader 
discussion about {{Row}} purpose, API, and behavior guarantees. Especially if 
we're going to introduce diverging implementations, with {{Row}} and 
{{LegacyRow}}.

Over the years Spark code have accumulated a lot of conflicting behaviors and 
special cases related to {{Row}}:
 * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not.
 * Sometimes there are treated as ordered products ({{tuples}}), sometimes as 
unordered dictionaries.
 * We provide efficient access only by position, but the primary access method 
is by name.
 * etc.

Some of the unusual properties, are well documented (but still confusing), 
other are not. For example objects that are indistinguishable using public API

 
{code:python}
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

a = Row(x1=1, x2="foo")
b = Row("x1", "x2")(1, "foo")

a == b
# True 
type(a) == type(b)
#True

list(a.__fields__) == list(b.__fields__)  # Not really public, but just to make 
a point
{code}
cannot be substituted in practice.
{code:python}
schema = StructType([
StructField("x2", StringType()),
StructField("x1", IntegerType())])

spark.createDataFrame([a], schema)  


# DataFrame[x2: string, x1: int]

spark.createDataFrame([b], schema) 
# TypeError Traceback (most recent call last)
# ...
# TypeError: field x1: IntegerType can not accept object 'foo' in type 
{code}
To make things even worse the primary (while I don't have hard data here, but 
it is common both in the internal API as well as the code I've seen in the 
wild) access method - by name - is _O(M)_  where is the width of schema.

So if we're going to modify the core behavior (sorting) it makes sense to 
rethink the whole design.

Since the schema is carried around with each object and pass over the wire we 
might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something 
around these lines:
{code:python}
import sys
from collections import OrderedDict

class Row:
slots = ["_store"]

def __init__(self, *args, **kwargs):
if args and kwargs:
raise ValueError("Can not use both args "
 "and kwargs to create Row")
if args:
self._store = OrderedDict.fromkeys(args)
else:
self._store = OrderedDict(kwargs)

def __getattr__(self, x):
return self._store[x]

def __getitem__(self, x):
if isinstance(x, int):
return list(self._store.values())[x]
else:
return self._store[x]

def __iter__(self):
return iter(self._store.values())

def __repr__(self):
return "Row({})".format(", ".join(
"{}={}".format(k, v) for k, v in self._store.items()
))

def __len__(self):
return len(self._store)

def __call__(self, *args):
if len(args) > len(self):
raise ValueError("Can not create Row with fields %s, expected %d 
values "
 "but got %s" % (self, len(self), args))

self._store.update(zip(self._store.keys(), args))
return self

def __eq__(self, other):
return isinstance(other, Row) and self._store == other._store

@property
def _fields(self):
return self._store.keys()

@staticmethod
def _conv(obj):
if isinstance(obj, Row):
return obj.asDict(True)
elif isinstance(obj, list):
return [conv(o) for o in obj]
elif isinstance(obj, dict):
return dict((k, conv(v)) for k, v in obj.items())
else:
return obj

def asDict(self, recursive=False):
if recursive:
result = OrderedDict.fromkeys(self._fields)
for key in self._fields:
result[key] = Row._conv(self._store[key])
return result
else:
return self._store

@classmethod
def  from_dict(cls, d):
if sys.version_info >= (3, 6):
if not(isinstance(d, dict)):
raise ValueError(
"from_dict requires dict but got {}".format(
type(d)))

else:
if not(isinstance(d, OrderedDict)):
raise ValueError(
"from_dict requires collections.OrderedDict {}".format(
type(d)))
return cls(**d)
{code}
If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index 

[jira] [Comment Edited] (SPARK-29748) Remove sorting of fields in PySpark SQL Row creation

2019-11-13 Thread Maciej Szymkiewicz (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973695#comment-16973695
 ] 

Maciej Szymkiewicz edited comment on SPARK-29748 at 11/13/19 9:11 PM:
--

While this is a step in the right direction I think it justifies a broader 
discussion about {{Row}} purpose, API, and behavior guarantees. Especially if 
we're going to introduce diverging implementations, with {{Row}} and 
{{LegacyRow}}.

Over the years Spark code have accumulated a lot of conflicting behaviors and 
special cases related to {{Row}}:
 * Sometimes {{Row}} are reordered (subject of this JIRA), sometimes are not.
 * Sometimes there are treated as ordered products ({{tuples}}), sometimes as 
unordered dictionaries.
 * We provide efficient access only by position, but the primary access method 
is by name.
 * etc.

Some of the unusual properties, are well documented (but still confusing), 
other are not. For example objects that are indistinguishable using public API

 
{code:python}
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

a = Row(x1=1, x2="foo")
b = Row("x1", "x2")(1, "foo")

a == b
# True 
type(a) == type(b)
#True

list(a.__fields__) == list(b.__fields__)  # Not really public, but just to make 
a point
{code}
cannot be substituted in practice.
{code:python}
schema = StructType([
StructField("x2", StringType()),
StructField("x1", IntegerType())])

spark.createDataFrame([a], schema)  


# DataFrame[x2: string, x1: int]

spark.createDataFrame([b], schema) 
# TypeError Traceback (most recent call last)
# ...
# TypeError: field x1: IntegerType can not accept object 'foo' in type 
{code}
To make things even worse the primary (while I don't have hard data here, but 
it is common both in the internal API as well as the code I've seen in the 
wild) access method - by name - is _O(M)_  where is the width of schema.

So if we're going to modify the core behavior (sorting) it makes sense to 
rethink the whole design.

Since the schema is carried around with each object and pass over the wire we 
might as well convert {{Row}} into a proxy of {{OrderedDict}} getting something 
around these lines:
{code:python}
import sys
from collections import OrderedDict

class Row:
slots = ["_store"]

def __init__(self, *args, **kwargs):
if args and kwargs:
raise ValueError("Can not use both args "
 "and kwargs to create Row")
if args:
self._store = OrderedDict.fromkeys(args)
else:
self._store = OrderedDict(kwargs)

def __getattr__(self, x):
return self._store[x]

def __getitem__(self, x):
if isinstance(x, int):
return list(self._store.values())[x]
else:
return self._store[x]

def __iter__(self):
return iter(self._store.values())

def __repr__(self):
return "Row({})".format(", ".join(
"{}={}".format(k, v) for k, v in self._store.items()
))

def __len__(self):
return len(self._store)

def __call__(self, *args):
if len(args) > len(self):
raise ValueError("Can not create Row with fields %s, expected %d 
values "
 "but got %s" % (self, len(self), args))

self._store.update(zip(self._store.keys(), args))
return self

def __eq__(self, other):
return isinstance(other, Row) and self._store == other._store

@property
def _fields(self):
return self._store.keys()

@staticmethod
def _conv(obj):
if isinstance(obj, Row):
return obj.asDict(True)
elif isinstance(obj, list):
return [conv(o) for o in obj]
elif isinstance(obj, dict):
return dict((k, conv(v)) for k, v in obj.items())
else:
return obj

def asDict(self, recursive=False):
if recursive:
result = OrderedDict.fromkeys(self._fields)
for key in self._fields:
result[key] = Row._conv(self._store[key])
return result
else:
return self._store

@classmethod
def  from_dict(cls, d):
if sys.version_info >= (3, 6):
if not(isinstance(d, dict)):
raise ValueError(
"from_dict requires dict but got {}".format(
type(d)))

else:
if not(isinstance(d, OrderedDict)):
raise ValueError(
"from_dict requires collections.OrderedDict {}".format(
type(d)))
return cls(**d)
{code}
If we're committed to {{Row}} being a {{tuple}} (with _O(1)_ by index