[ https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andres Fernandez updated SPARK-27613: ------------------------------------- Component/s: (was: Spark Core) PySpark > Caching an RDD composed of Row Objects produces some kind of key recombination > ------------------------------------------------------------------------------ > > Key: SPARK-27613 > URL: https://issues.apache.org/jira/browse/SPARK-27613 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.0 > Reporter: Andres Fernandez > Priority: Major > > (Code included at the bottom) > The function "+create_dataframes_from_azure_responses_rdd+" receives > *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, > _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will > then go ahead and iterate over the table names to create dataframes filtering > the RDDs by the first element and valid response. > So far so good. > QueryResponse object (from azure.loganalytics package) contains, essentialy, > a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. > Every single response (fifth element of the tuple [4]) for the same table > name (first element of the tuple [0]) has exactly the same columns in the > same order (order is not important other thant to reference the rows data > inside the same response anyways). The types are stored in *column_types* > taking the first response as the sample. > Now to the tricky part. > I call flatMap on the *responses_rdd* with the function > +"tabularize_response_rdd+" which basically creates a Row object for every > row (_list_ of _str_) in the _QueryResponse_. I also create the schema based > on a *type_map* from azure types to spark.sql.types in order to specify it to > the subsequent createDataFrame instruction. If the result of this flatMap, > *table_tabular_rdd*, is not cached before creating the DataFrame from the > Rows RDD everything works smoothly. Nevertheless if the result of the > flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a > mismatch is evidenced between the actual key:values for the Row objects. > It would be good to point that when a Row Object is created from an unpacked > dict the code in > [[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here) > sorts the keys; is this behaviour overriden somehow by caching? > Let me please know what I am doing wrong, is there any best practice / > documented solution I am not following? Im just a beginner when it comes to > Spark and would happily accept any suggestion. I hope I was clear enough, and > I am open to give you any additional details that might be helpful. Thank > you! (Code and error attached as well). > The error looks like if it was related to casting, but it can be seen that > the contents do not correspond to the key. *record_count* key is actually a > Long but in the Row it got somehow swapped for another key's value, in this > case 'n/a'. > {code:java} > def create_dataframes_from_azure_responses_rdd(table_names: list, > responses_rdd: pyspark.rdd, verbose:bool=False) -> list: > ws_column_name = "WorkspaceId" > def tabularize_response_rdd(x: tuple): > import pyspark > tn, wsid, count, interval, response = x > ret = [] > if response.tables[0].rows: > ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi > in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows] > return ret > data_frames = {} > for tn in table_names: > if verbose: print("Filtering RDD items for {}".format(tn)) > table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and > x[4]!=None).cache() > > data_frames[tn] = None > if verbose: print("Checking if RDD for {} has data".format(tn)) > if not table_response_rdd.isEmpty(): > if verbose: print("Getting column types for {} from azure > response".format(tn)) > column_types = {f.name:f.type for f in > table_response_rdd.take(1)[0][4].tables[0].columns} > column_types[ws_column_name] = "string" > if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn)) > table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) > #.cache() #Error with cache, no error without! > if verbose: print("Getting sample row for {}".format(tn)) > row_fields = table_tabular_rdd.take(1)[0].asDict().keys() > if verbose: print("Building schema for {} from sample row and column > types".format(tn)) > current_schema = StructType([StructField(f, > type_map[column_types[f]](), True) for f in row_fields]) > if verbose: print("Creating dataframe for {}".format(tn)) > table_df = spark.createDataFrame(table_tabular_rdd, > schema=current_schema).cache() > if verbose: print("Calculating expected count for {}".format(tn)) > expected_count = table_response_rdd.map(lambda x: > (x[1],x[2])).distinct().map(lambda x: x[1]).sum() > real_count = > table_df.select("record_count").groupBy().sum().collect()[0][0] > table_response_rdd.unpersist() > #table_tabular_rdd.unpersist() > if verbose: print("Expected count {} vs Real count > {}".format(expected_count, real_count)) > data_frames[tn]=table_df > else: > if verbose: print("{} table was empty!".format(tn)) > return data_frames > {code} > {noformat} > Py4JJavaError Traceback (most recent call last) <command-2824384765475765> in > <module>() 1 resrdds = get_data_for_timespan_accross_laws(wss, tns, 1, > sta, container, sta_key, tid, creds, 5000, True) 2 resrdds.cache() > ----> 3 dfs_raw = create_dataframes_from_azure_responses_rdd(tns, resrdds, > True) 4 resrdds.unpersist() <command-2824384765475774> in > create_dataframes_from_azure_responses_rdd(table_names, responses_rdd, > verbose) 37 if verbose: print("Calculating expected count for > {}".format(tn)) 38 expected_count = table_response_rdd.map(lambda x: > (x[1],x[2])).distinct().map(lambda x: x[1]).sum() ---> 39 real_count = > table_df.select("record_count").groupBy().sum().collect()[0][0] 40 > table_response_rdd.unpersist() 41 #table_tabular_rdd.unpersist() > /databricks/spark/python/pyspark/sql/dataframe.py in collect(self) 546 # > Default path used in OSS Spark / for non-DF-ACL clusters: 547 with > SCCallSiteSync(self._sc) as css: --> 548 sock_info = > self._jdf.collectToPython() 549 return list(_load_from_socket(sock_info, > BatchedSerializer(PickleSerializer()))) 550 > /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in > __call__(self, *args) 1255 answer = > self.gateway_client.send_command(command) 1256 return_value = > get_return_value( -> 1257 answer, self.gateway_client, > self.target_id, self.name) > 1258 1259 for temp_arg in temp_args: > /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def > deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 > except py4j.protocol.Py4JJavaError as e: 65 s = > e.java_exception.toString() > /databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) 326 raise > Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> > 328 format(target_id, ".", name), value) > 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while > calling o700.collectToPython. : org.apache.spark.SparkException: Job aborted > due to stage failure: Task 58 in stage 141.0 failed 4 times, most recent > failure: Lost task 58.3 in stage 141.0 (TID 76193, 10.139.64.12, executor 0): > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main > process() File "/databricks/spark/python/pyspark/worker.py", line 398, in > process serializer.dump_stream(func(split_index, iterator), outfile) File > "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream > vs = list(itertools.islice(iterator, batch)) File > "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return > f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", > line 785, in prepare verify_func(obj) File > "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify > verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line > 1370, in verify_struct verifier(v) File > "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify > verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line > 1383, in verify_default verify_acceptable_types(obj) File > "/databricks/spark/python/pyspark/sql/types.py", line 1278, in > verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field > record_count: LongType can not accept object 'n/a' in type <class 'str'> at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) > at > org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2100) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2088) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2087) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2087) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1076) > at scala.Option.foreach(Option.scala:257) at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1076) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2319) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2267) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2255) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:873) at > org.apache.spark.SparkContext.runJob(SparkContext.scala:2252) at > org.apache.spark.SparkContext.runJob(SparkContext.scala:2350) at > org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:234) > at > org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:269) > at > org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69) > at > org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:75) > at > org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:497) > at > org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:469) > at > org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:312) > at org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3289) at > org.apache.spark.sql.Dataset$$anonfun$51.apply(Dataset.scala:3288) at > org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228) > at > org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3422) > at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3288) at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at > py4j.Gateway.invoke(Gateway.java:295) at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at > py4j.commands.CallCommand.execute(CallCommand.java:79) at > py4j.GatewayConnection.run(GatewayConnection.java:251) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): File "/databricks/spark/python/pyspark/worker.py", line 403, in main > process() File "/databricks/spark/python/pyspark/worker.py", line 398, in > process serializer.dump_stream(func(split_index, iterator), outfile) File > "/databricks/spark/python/pyspark/serializers.py", line 413, in dump_stream > vs = list(itertools.islice(iterator, batch)) File > "/databricks/spark/python/pyspark/util.py", line 99, in wrapper return > f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", > line 785, in prepare verify_func(obj) File > "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify > verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line > 1370, in verify_struct verifier(v) File > "/databricks/spark/python/pyspark/sql/types.py", line 1389, in verify > verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line > 1383, in verify_default verify_acceptable_types(obj) File > "/databricks/spark/python/pyspark/sql/types.py", line 1278, in > verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field > record_count: LongType can not accept object 'n/a' in type <class 'str'> at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:444) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:634) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:133) > at > org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1170) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1161) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1096) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1161) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:883) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:351) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:302) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:304) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1432) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org