[ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-------------------------------------
    Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

I call flatMap on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. I also create the schema based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is not cached before creating the DataFrame from the Rows 
RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
    import pyspark
    tn, wsid, count, interval, response = x
    ret = []
    if response.tables[0].rows:
      ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
    return ret
  data_frames = {}
  for tn in table_names:
    if verbose: print("Filtering RDD items for {}".format(tn))
    table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()
    
    data_frames[tn] = None
    if verbose: print("Checking if RDD for {} has data".format(tn))
    if not table_response_rdd.isEmpty():
      if verbose: print("Getting column types for {} from azure 
response".format(tn))
      column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
      column_types[ws_column_name] = "string"
      if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
      table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
      if verbose: print("Getting sample row for {}".format(tn))
      row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
      if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
      current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
      if verbose: print("Creating dataframe for {}".format(tn))
      table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
      if verbose: print("Calculating expected count for {}".format(tn))
      expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
      real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
      table_response_rdd.unpersist()
      #table_tabular_rdd.unpersist()
      if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
      data_frames[tn]=table_df
    else:
      if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}
{noformat}
Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in 
<module>()       1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, 
sta, container, sta_key, tid, creds, 5000, True)       2 resrdds.cache() ----> 
3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, True)      
 4 resrdds.unpersist() <command-2824384765475774> in 
create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, verbose) 
     37 if verbose: print("Calculating expected count for {}".format(tn))      
38 expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39       real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]      40 
table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() 
/databricks/spark/python/pyspark/sql/dataframe.py in collect(self)     546 # 
Default path used in OSS Spark / for non-DF-ACL clusters:     547 with 
SCCallSiteSync(self._sc) as css: --> 548             sock_info = 
self._jdf.collectToPython()     549 return list(_load_from_socket(sock_info, 
BatchedSerializer(PickleSerializer())))     550 
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in 
__call__(self, *args)    1255 answer = 
self.gateway_client.send_command(command)    1256 return_value = 
get_return_value( -> 1257             answer, self.gateway_client, 
self.target_id, self.name)
   1258    1259 for temp_arg in temp_args: 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)      61 def 
deco(*a, **kw):      62 try: ---> 63             return f(*a, **kw)      64 
except py4j.protocol.Py4JJavaError as e:      65 s = 
e.java_exception.toString() 
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)     326 raise 
Py4JJavaError(     327 "An error occurred while calling {0}{1}{2}.\n". --> 328  
                   format(target_id, ".", name), value)
    329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while 
calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent 
failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 398, in process 
serializer.dump_stream(func(split_index, iterator), outfile) File 
"/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs 
= list(itertools.islice(iterator, batch)) File 
"/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, 
**kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in 
prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1389, in verify verify_value(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct 
verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in 
verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1383, in verify_default verify_acceptable_types(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1278, in 
verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field 
record_count: LongType can not accept object 'n/a' in type <class 'str'> at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
 at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) 
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) 
at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
 at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
 at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) 
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
 at scala.Option.foreach(Option.scala:257) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at 
org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234)
 at 
org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) 
at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) 
at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) 
at 
org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
 at 
org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469)
 at 
org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312)
 at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at 
org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at 
org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
 at 
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
 at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422)
 at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at 
py4j.Gateway.invoke(Gateway.java:295) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:251) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 398, in process 
serializer.dump_stream(func(split_index, iterator), outfile) File 
"/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs 
= list(itertools.islice(iterator, batch)) File 
"/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, 
**kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in 
prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1389, in verify verify_value(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct 
verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in 
verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1383, in verify_default verify_acceptable_types(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1278, in 
verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field 
record_count: LongType can not accept object 'n/a' in type <class 'str'> at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
 at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) 
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) 
at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
 at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
 at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) 
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
... 1 more
{noformat}

  was:
The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

I call flatMap on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. I also create the schema based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is not cached before creating the DataFrame from the Rows 
RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
    import pyspark
    tn, wsid, count, interval, response = x
    ret = []
    if response.tables[0].rows:
      ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
    return ret
  data_frames = {}
  for tn in table_names:
    if verbose: print("Filtering RDD items for {}".format(tn))
    table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()
    
    data_frames[tn] = None
    if verbose: print("Checking if RDD for {} has data".format(tn))
    if not table_response_rdd.isEmpty():
      if verbose: print("Getting column types for {} from azure 
response".format(tn))
      column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
      column_types[ws_column_name] = "string"
      if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
      table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
      if verbose: print("Getting sample row for {}".format(tn))
      row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
      if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
      current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
      if verbose: print("Creating dataframe for {}".format(tn))
      table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
      if verbose: print("Calculating expected count for {}".format(tn))
      expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
      real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
      table_response_rdd.unpersist()
      #table_tabular_rdd.unpersist()
      if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
      data_frames[tn]=table_df
    else:
      if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}
{noformat}
Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in 
<module>()       1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, 
sta, container, sta_key, tid, creds, 5000, True)       2 resrdds.cache() ----> 
3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, True)      
 4 resrdds.unpersist() <command-2824384765475774> in 
create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, verbose) 
     37 if verbose: print("Calculating expected count for {}".format(tn))      
38 expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39       real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]      40 
table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() 
/databricks/spark/python/pyspark/sql/dataframe.py in collect(self)     546 # 
Default path used in OSS Spark / for non-DF-ACL clusters:     547 with 
SCCallSiteSync(self._sc) as css: --> 548             sock_info = 
self._jdf.collectToPython()     549 return list(_load_from_socket(sock_info, 
BatchedSerializer(PickleSerializer())))     550 
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in 
__call__(self, *args)    1255 answer = 
self.gateway_client.send_command(command)    1256 return_value = 
get_return_value( -> 1257             answer, self.gateway_client, 
self.target_id, self.name)
   1258    1259 for temp_arg in temp_args: 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)      61 def 
deco(*a, **kw):      62 try: ---> 63             return f(*a, **kw)      64 
except py4j.protocol.Py4JJavaError as e:      65 s = 
e.java_exception.toString() 
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)     326 raise 
Py4JJavaError(     327 "An error occurred while calling {0}{1}{2}.\n". --> 328  
                   format(target_id, ".", name), value)
    329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while 
calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted 
due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent 
failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 398, in process 
serializer.dump_stream(func(split_index, iterator), outfile) File 
"/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs 
= list(itertools.islice(iterator, batch)) File 
"/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, 
**kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in 
prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1389, in verify verify_value(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct 
verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in 
verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1383, in verify_default verify_acceptable_types(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1278, in 
verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field 
record_count: LongType can not accept object 'n/a' in type <class 'str'> at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
 at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) 
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) 
at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
 at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
 at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) 
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
 at scala.Option.foreach(Option.scala:257) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at 
org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234)
 at 
org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) 
at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) 
at 
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) 
at 
org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
 at 
org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469)
 at 
org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312)
 at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at 
org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at 
org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
 at 
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
 at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422)
 at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at 
py4j.Gateway.invoke(Gateway.java:295) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:251) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/databricks/spark/python/pyspark/worker.py", line 403, in main process() 
File "/databricks/spark/python/pyspark/worker.py", line 398, in process 
serializer.dump_stream(func(split_index, iterator), outfile) File 
"/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream vs 
= list(itertools.islice(iterator, batch)) File 
"/databricks/spark/python/pyspark/util.py", line 99, in wrapper return f(*args, 
**kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 785, in 
prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1389, in verify verify_value(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1370, in verify_struct 
verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1389, in 
verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", 
line 1383, in verify_default verify_acceptable_types(obj) File 
"/databricks/spark/python/pyspark/sql/types.py", line 1278, in 
verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field 
record_count: LongType can not accept object 'n/a' in type <class 'str'> at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
 at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) 
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) 
at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
 at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
 at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) 
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
... 1 more
{noformat}


> Caching an RDD composed of Row Objects produces some kind of key recombination
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-27613
>                 URL: https://issues.apache.org/jira/browse/SPARK-27613
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Andres Fernandez
>            Priority: Major
>
> (Code included at the bottom)
> The function "+create_dataframes_from_azure_responses_rdd+" receives 
> *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
> _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
> then go ahead and iterate over the table names to create dataframes filtering 
> the RDDs by the first element and valid response.
> So far so good.
> QueryResponse object (from azure.loganalytics package) contains, essentialy, 
> a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
> Every single response (fifth element of the tuple [4]) for the same table 
> name (first element of the tuple [0]) has exactly the same columns in the 
> same order (order is not important other thant to reference the rows data 
> inside the same response anyways). The types are stored in *column_types* 
> taking the first response as the sample.
> Now to the tricky part.
> I call flatMap on the *responses_rdd* with the function 
> +"tabularize_response_rdd+" which basically creates a Row object for every 
> row (_list_ of _str_) in the _QueryResponse_. I also create the schema based 
> on a *type_map* from azure types to spark.sql.types in order to specify it to 
> the subsequent createDataFrame instruction. If the result of this flatMap, 
> *table_tabular_rdd*, is not cached before creating the DataFrame from the 
> Rows RDD everything works smoothly. Nevertheless if the result of the 
> flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a 
> mismatch is evidenced between the actual key:values for the Row objects.
> It would be good to point that when a Row Object is created from an unpacked 
> dict the code in 
> [[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
>  sorts the keys; is this behaviour overriden somehow by caching?
> Let me please know what I am doing wrong, is there any best practice / 
> documented solution I am not following? Im just a beginner when it comes to 
> Spark and would happily accept any suggestion. I hope I was clear enough, and 
> I am open to give you any additional details that might be helpful. Thank 
> you! (Code and error attached as well).
> The error looks like if it was related to casting, but it can be seen that 
> the contents do not correspond to the key. *record_count* key is actually a 
> Long but in the Row it got somehow swapped for another key's value, in this 
> case 'n/a'.
> {code:java}
> def create_dataframes_from_azure_responses_rdd(table_names: list, 
> responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
>   ws_column_name = "WorkspaceId"
>   def tabularize_response_rdd(x: tuple):
>     import pyspark
>     tn, wsid, count, interval, response = x
>     ret = []
>     if response.tables[0].rows:
>       ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
> in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
>     return ret
>   data_frames = {}
>   for tn in table_names:
>     if verbose: print("Filtering RDD items for {}".format(tn))
>     table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
> x[4]!=None).cache()
>     
>     data_frames[tn] = None
>     if verbose: print("Checking if RDD for {} has data".format(tn))
>     if not table_response_rdd.isEmpty():
>       if verbose: print("Getting column types for {} from azure 
> response".format(tn))
>       column_types = {f.name:f.type for f in 
> table_response_rdd.take(1)[0][4].tables[0].columns}
>       column_types[ws_column_name] = "string"
>       if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
>       table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
> #.cache() #Error with cache, no error without!
>       if verbose: print("Getting sample row for {}".format(tn))
>       row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
>       if verbose: print("Building schema for {} from sample row and column 
> types".format(tn))
>       current_schema = StructType([StructField(f, 
> type_map[column_types[f]](), True) for f in row_fields])
>       if verbose: print("Creating dataframe for {}".format(tn))
>       table_df = spark.createDataFrame(table_tabular_rdd, 
> schema=current_schema).cache()
>       if verbose: print("Calculating expected count for {}".format(tn))
>       expected_count = table_response_rdd.map(lambda x: 
> (x[1],x[2])).distinct().map(lambda x: x[1]).sum()
>       real_count = 
> table_df.select("record_count").groupBy().sum().collect()[0][0]
>       table_response_rdd.unpersist()
>       #table_tabular_rdd.unpersist()
>       if verbose: print("Expected count {} vs Real count 
> {}".format(expected_count, real_count))
>       data_frames[tn]=table_df
>     else:
>       if verbose: print("{} table was empty!".format(tn))
>   return data_frames
> {code}
> {noformat}
> Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in 
> <module>()       1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, 
> sta, container, sta_key, tid, creds, 5000, True)       2 resrdds.cache() 
> ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, 
> True)       4 resrdds.unpersist() <command-2824384765475774> in 
> create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, 
> verbose)      37 if verbose: print("Calculating expected count for 
> {}".format(tn))      38 expected_count = table_response_rdd.map(lambda x: 
> (x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39       real_count = 
> table_df.select("record_count").groupBy().sum().collect()[0][0]      40 
> table_response_rdd.unpersist()      41 #table_tabular_rdd.unpersist() 
> /databricks/spark/python/pyspark/sql/dataframe.py in collect(self)     546 # 
> Default path used in OSS Spark / for non-DF-ACL clusters:     547 with 
> SCCallSiteSync(self._sc) as css: --> 548             sock_info = 
> self._jdf.collectToPython()     549 return list(_load_from_socket(sock_info, 
> BatchedSerializer(PickleSerializer())))     550 
> /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)    1255 answer = 
> self.gateway_client.send_command(command)    1256 return_value = 
> get_return_value( -> 1257             answer, self.gateway_client, 
> self.target_id, self.name)
>    1258    1259 for temp_arg in temp_args: 
> /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)      61 def 
> deco(*a, **kw):      62 try: ---> 63             return f(*a, **kw)      64 
> except py4j.protocol.Py4JJavaError as e:      65 s = 
> e.java_exception.toString() 
> /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)     326 raise 
> Py4JJavaError(     327 "An error occurred while calling {0}{1}{2}.\n". --> 
> 328                     format(target_id, ".", name), value)
>     329 else:     330 raise Py4JError( Py4JJavaError: An error occurred while 
> calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted 
> due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent 
> failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main 
> process() File "/databricks/spark/python/pyspark/worker.py", line 398, in 
> process serializer.dump_stream(func(split_index, iterator), outfile) File 
> "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream 
> vs = list(itertools.islice(iterator, batch)) File 
> "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return 
> f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", 
> line 785, in prepare verify_func(obj) File 
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify 
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 
> 1370, in verify_struct verifier(v) File 
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify 
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 
> 1383, in verify_default verify_acceptable_types(obj) File 
> "/databricks/spark/python/pyspark/sql/types.py", line 1278, in 
> verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field 
> record_count: LongType can not accept object 'n/a' in type <class 'str'> at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
>  at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) 
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) 
> at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
>  at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
>  at 
> org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
>  at 
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
>  at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
>  at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076)
>  at scala.Option.foreach(Option.scala:257) at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267)
>  at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at 
> org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234)
>  at 
> org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) 
> at 
> org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) 
> at 
> org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) 
> at 
> org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497)
>  at 
> org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469)
>  at 
> org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312)
>  at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at 
> org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at 
> org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158)
>  at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422)
>  at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at 
> py4j.Gateway.invoke(Gateway.java:295) at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
> py4j.commands.CallCommand.execute(CallCommand.java:79) at 
> py4j.GatewayConnection.run(GatewayConnection.java:251) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main 
> process() File "/databricks/spark/python/pyspark/worker.py", line 398, in 
> process serializer.dump_stream(func(split_index, iterator), outfile) File 
> "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream 
> vs = list(itertools.islice(iterator, batch)) File 
> "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return 
> f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", 
> line 785, in prepare verify_func(obj) File 
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify 
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 
> 1370, in verify_struct verifier(v) File 
> "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify 
> verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 
> 1383, in verify_default verify_acceptable_types(obj) File 
> "/databricks/spark/python/pyspark/sql/types.py", line 1278, in 
> verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field 
> record_count: LongType can not accept object 'n/a' in type <class 'str'> at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
>  at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) 
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) 
> at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444)
>  at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
>  at 
> org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133)
>  at 
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
>  at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ... 1 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to