[ https://issues.apache.org/jira/browse/SPARK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304266#comment-17304266 ]
Laurens commented on SPARK-34693: --------------------------------- {code:java} def tps(df) -> pd.DataFrame["workflow_id": int, "task_id": int, "task_slack": int]: df.set_index("id", inplace=True) graph = dict() forward_dict = dict() finish_times = dict() task_runtimes = dict() task_arrival_times = dict() workflow_id = None for row in df.to_records(): # 0: task id, 1: wf id, 2: children, 3: parents, 4: ts submit, 5: runtime graph[row[0]] = set(row[3].flatten()) forward_dict[row[0]] = set(row[2].flatten()) task_runtimes[row[0]] = row[5] task_arrival_times[row[0]] = row[4] workflow_id = row[1] del df del row try: groups = list(toposort(graph)) except CircularDependencyError: del forward_dict del finish_times del task_runtimes del task_arrival_times del workflow_id return pd.DataFrame(columns=["workflow_id", "task_id", "task_slack"]) del graph if len(groups) < 2: del forward_dict del finish_times del task_runtimes del task_arrival_times del workflow_id del groups return pd.DataFrame(columns=["workflow_id", "task_id", "task_slack"]) # Compute all full paths max_for_task = dict() q = deque() for i in groups[0]: max_for_task[i] = task_arrival_times[i] + task_runtimes[i] q.append((max_for_task[i], [i])) paths = [] max_path = -1 while len(q) > 0: time, path = q.popleft() # get partial path # We are are at time t[0] having travelled path t[1] if len(forward_dict[path[-1]]) == 0: # End task if time < max_path: continue # smaller path, cannot be a CP if time > max_path: # If we have a new max, clear the list and set it to the max max_path = time paths.clear() paths.append(path) # Add new and identical length paths else: for c in forward_dict[path[-1]]: # Loop over all children of the last task in the path we took if c in task_runtimes: # Special case: we find a child that arrives later then the current path. # Start a new path then from this child onwards and do not mark the previous nodes on the critical path # as they can actually delay. if time < task_arrival_times[c]: if c not in max_for_task: max_for_task[c] = task_arrival_times[c] + task_runtimes[c] q.append((max_for_task[c], [c])) else: # If the finishing of one of the children + the runtime causes the same or a new maximum, add a path to the queue # to explore from there on at that time. child_finish = time + task_runtimes[c] if c in max_for_task and child_finish < max_for_task[c]: continue max_for_task[c] = child_finish l = path.copy() l.append(c) q.append((child_finish, l)) del time del max_for_task del forward_dict del max_path del q del path if(len(paths) == 1): citical_path_tasks = set(paths[0]) else: citical_path_tasks = _reduce(set.union, [set(cp) for cp in paths]) del paths rows = deque() for i in range(len(groups)): max_in_group = -1 for task_id in groups[i]: # Compute max finish time in the group if task_id not in finish_times: continue # Task was not in snapshot of trace task_finish = finish_times[task_id] if task_finish > max_in_group: max_in_group = task_finish for task_id in groups[i]: if task_id in citical_path_tasks: # Skip if task is on a critical path rows.append([workflow_id, task_id, 0]) else: if task_id not in finish_times: continue # Task was not in snapshot of trace rows.append([workflow_id, task_id, max_in_group - finish_times[task_id]]) del groups del citical_path_tasks del finish_times return pd.DataFrame(rows, columns=["workflow_id", "task_id", "task_slack"]) {code} and the reading part: {code:java} kdf = ks.read_parquet(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"), columns=[ "workflow_id", "id", "task_id", "children", "parents", "ts_submit", "runtime" ], pandas_metadata=False, engine='pyarrow') kdf = kdf[((kdf['children'].map(len) > 0) | (kdf['parents'].map(len) > 0))] kdf = kdf.astype({"ts_submit":'int32', "runtime":'int32'}) kdf.dropna(inplace=True) grouped_df = kdf.groupby("workflow_id") grouped_df.apply(tps).to_parquet(output, compression='snappy', engine='pyarrow'){code} the hdfs path points to [https://doi.org/10.5281/zenodo.3254606] as dataset. I have since changed the implementation of tps as it was too memory hungry (moved to an recursive approach instead of iterative) and I have not seen the crash since. > Support null in conversions to and from Arrow > --------------------------------------------- > > Key: SPARK-34693 > URL: https://issues.apache.org/jira/browse/SPARK-34693 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.1.1 > Reporter: Laurens > Priority: Minor > > Looks like a regression from or related to SPARK-33489. > I got the following running spark 3.1.1. with koalas 1.7.0 > {code:java} > TypeError Traceback (most recent call last) > /var/scratch/miniconda3/lib/python3.8/site-packages/pyspark/sql/udf.py in > returnType(self) > 100 try: > --> 101 to_arrow_type(self._returnType_placeholder) > 102 except TypeError: > /var/scratch/miniconda3/lib/python3.8/site-packages/pyspark/sql/pandas/types.py > in to_arrow_type(dt) > 75 else: > ---> 76 raise TypeError("Unsupported type in conversion to Arrow: " + > str(dt)) > 77 return arrow_type > TypeError: Unsupported type in conversion to Arrow: NullType > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org