[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, 
a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:python}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and not x[4] 
is None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, 
a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:python}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, 
a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

I call flatMap on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. I also create the schema based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Component/s: (was: Spark Core)
 PySpark

> Caching an RDD composed of Row Objects produces some kind of key recombination
> --
>
> Key: SPARK-27613
> URL: https://issues.apache.org/jira/browse/SPARK-27613
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Andres Fernandez
>Priority: Major
>
> (Code included at the bottom)
> The function "+create_dataframes_from_azure_responses_rdd+" receives 
> *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
> _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
> then go ahead and iterate over the table names to create dataframes filtering 
> the RDDs by the first element and valid response.
> So far so good.
> QueryResponse object (from azure.loganalytics package) contains, essentialy, 
> a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
> Every single response (fifth element of the tuple [4]) for the same table 
> name (first element of the tuple [0]) has exactly the same columns in the 
> same order (order is not important other thant to reference the rows data 
> inside the same response anyways). The types are stored in *column_types* 
> taking the first response as the sample.
> Now to the tricky part.
> I call flatMap on the *responses_rdd* with the function 
> +"tabularize_response_rdd+" which basically creates a Row object for every 
> row (_list_ of _str_) in the _QueryResponse_. I also create the schema based 
> on a *type_map* from azure types to spark.sql.types in order to specify it to 
> the subsequent createDataFrame instruction. If the result of this flatMap, 
> *table_tabular_rdd*, is not cached before creating the DataFrame from the 
> Rows RDD everything works smoothly. Nevertheless if the result of the 
> flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a 
> mismatch is evidenced between the actual key:values for the Row objects.
> It would be good to point that when a Row Object is created from an unpacked 
> dict the code in 
> [[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
>  sorts the keys; is this behaviour overriden somehow by caching?
> Let me please know what I am doing wrong, is there any best practice / 
> documented solution I am not following? Im just a beginner when it comes to 
> Spark and would happily accept any suggestion. I hope I was clear enough, and 
> I am open to give you any additional details that might be helpful. Thank 
> you! (Code and error attached as well).
> The error looks like if it was related to casting, but it can be seen that 
> the contents do not correspond to the key. *record_count* key is actually a 
> Long but in the Row it got somehow swapped for another key's value, in this 
> case 'n/a'.
> {code:java}
> def create_dataframes_from_azure_responses_rdd(table_names: list, 
> responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
>   ws_column_name = "WorkspaceId"
>   def tabularize_response_rdd(x: tuple):
> import pyspark
> tn, wsid, count, interval, response = x
> ret = []
> if response.tables[0].rows:
>   ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
> in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
> return ret
>   data_frames = {}
>   for tn in table_names:
> if verbose: print("Filtering RDD items for {}".format(tn))
> table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
> x[4]!=None).cache()
> 
> data_frames[tn] = None
> if verbose: print("Checking if RDD for {} has data".format(tn))
> if not table_response_rdd.isEmpty():
>   if verbose: print("Getting column types for {} from azure 
> response".format(tn))
>   column_types = {f.name:f.type for f in 
> table_response_rdd.take(1)[0][4].tables[0].columns}
>   column_types[ws_column_name] = "string"
>   if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
>   table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
> #.cache() #Error with cache, no error without!
>   if verbose: print("Getting sample row for {}".format(tn))
>   row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
>   if verbose: print("Building schema for {} from sample row and column 
> types".format(tn))
>   current_schema = StructType([StructField(f, 
> type_map[column_types[f]](), True) for f in row_fields])
>   if verbose: print("Creating dataframe for {}".format(tn))
>   

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

I call flatMap on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. I also create the schema based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is not cached before creating the DataFrame from the Rows 
RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}