[ 
https://issues.apache.org/jira/browse/SPARK-34693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304266#comment-17304266
 ] 

Laurens commented on SPARK-34693:
---------------------------------

{code:java}
def tps(df) -> pd.DataFrame["workflow_id": int, "task_id": int, "task_slack": 
int]:
    df.set_index("id", inplace=True)
  
    graph = dict()
    forward_dict = dict()
    finish_times = dict()
    task_runtimes = dict()
    task_arrival_times = dict()
    workflow_id = None
    for row in df.to_records(): # 0: task id, 1: wf id, 2: children, 3: 
parents, 4: ts submit, 5: runtime
        graph[row[0]] = set(row[3].flatten())
        forward_dict[row[0]] = set(row[2].flatten())
        task_runtimes[row[0]] = row[5]
        task_arrival_times[row[0]] = row[4]
        workflow_id = row[1]
        
    del df
    del row
    
    try:
        groups = list(toposort(graph))
    except CircularDependencyError:
        del forward_dict
        del finish_times
        del task_runtimes
        del task_arrival_times
        del workflow_id
        return pd.DataFrame(columns=["workflow_id", "task_id", "task_slack"])
    
    del graph    if len(groups) < 2:
        del forward_dict
        del finish_times
        del task_runtimes
        del task_arrival_times
        del workflow_id
        del groups
        return pd.DataFrame(columns=["workflow_id", "task_id", "task_slack"])   
 # Compute all full paths
    max_for_task = dict()
    
    q = deque()
    for i in groups[0]:
        max_for_task[i] = task_arrival_times[i] + task_runtimes[i]
        q.append((max_for_task[i], [i]))
    
    paths = []
    max_path = -1
    
    while len(q) > 0:
        time, path = q.popleft()  # get partial path  # We are are at time t[0] 
having travelled path t[1]
        if len(forward_dict[path[-1]]) == 0: # End task
            if time < max_path: continue  # smaller path, cannot be a CP
            if time > max_path:  # If we have a new max, clear the list and set 
it to the max
                max_path = time
                paths.clear()
            paths.append(path)  # Add new and identical length paths
        else:
            for c in forward_dict[path[-1]]:  # Loop over all children of the 
last task in the path we took
                if c in task_runtimes:
                    # Special case: we find a child that arrives later then the 
current path.
                    # Start a new path then from this child onwards and do not 
mark the previous nodes on the critical path
                    # as they can actually delay.
                    if time < task_arrival_times[c]:
                        if c not in max_for_task:
                            max_for_task[c] = task_arrival_times[c] + 
task_runtimes[c]
                            q.append((max_for_task[c], [c]))
                    else:
                        # If the finishing of one of the children + the runtime 
causes the same or a new maximum, add a path to the queue
                        # to explore from there on at that time.
                        child_finish = time + task_runtimes[c]
                        if c in max_for_task and child_finish < 
max_for_task[c]:  continue
                        max_for_task[c] = child_finish
                        l = path.copy()
                        l.append(c)
                        q.append((child_finish, l))
    
    del time
    del max_for_task
    del forward_dict
    del max_path
    del q
    del path
    
    if(len(paths) == 1):
        citical_path_tasks = set(paths[0])
    else:
        citical_path_tasks = _reduce(set.union, [set(cp) for cp in paths])
    del paths
    
    rows = deque()
    for i in range(len(groups)):
        max_in_group = -1
        for task_id in groups[i]:  # Compute max finish time in the group
            if task_id not in finish_times: 
                continue  # Task was not in snapshot of trace
            task_finish = finish_times[task_id]
            if task_finish > max_in_group:
                max_in_group = task_finish
        
        for task_id in groups[i]:
            if task_id in citical_path_tasks:  # Skip if task is on a critical 
path
                rows.append([workflow_id, task_id, 0])
            else: 
                if task_id not in finish_times: continue  # Task was not in 
snapshot of trace
                rows.append([workflow_id, task_id, max_in_group - 
finish_times[task_id]])
    
    del groups
    del citical_path_tasks
    del finish_times
    return pd.DataFrame(rows, columns=["workflow_id", "task_id", "task_slack"])
{code}
and the reading part:
{code:java}
kdf = ks.read_parquet(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"),
 columns=[
 "workflow_id", "id", "task_id", "children", "parents", "ts_submit", "runtime"
 ], pandas_metadata=False, engine='pyarrow')
 
kdf = kdf[((kdf['children'].map(len) > 0) | (kdf['parents'].map(len) > 0))]
kdf = kdf.astype({"ts_submit":'int32', "runtime":'int32'})
kdf.dropna(inplace=True)
grouped_df = kdf.groupby("workflow_id")
grouped_df.apply(tps).to_parquet(output, compression='snappy', 
engine='pyarrow'){code}
the hdfs path points to [https://doi.org/10.5281/zenodo.3254606] as dataset. I 
have since changed the implementation of tps as it was too memory hungry (moved 
to an recursive approach instead of iterative) and I have not seen the crash 
since.

> Support null in conversions to and from Arrow
> ---------------------------------------------
>
>                 Key: SPARK-34693
>                 URL: https://issues.apache.org/jira/browse/SPARK-34693
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.1.1
>            Reporter: Laurens
>            Priority: Minor
>
> Looks like a regression from or related to SPARK-33489.
> I got the following running spark 3.1.1. with koalas 1.7.0
> {code:java}
> TypeError                                 Traceback (most recent call last)
> /var/scratch/miniconda3/lib/python3.8/site-packages/pyspark/sql/udf.py in 
> returnType(self)
>     100             try:
> --> 101                 to_arrow_type(self._returnType_placeholder)
>     102             except TypeError:
> /var/scratch/miniconda3/lib/python3.8/site-packages/pyspark/sql/pandas/types.py
>  in to_arrow_type(dt)
>      75     else:
> ---> 76         raise TypeError("Unsupported type in conversion to Arrow: " + 
> str(dt))
>      77     return arrow_type
> TypeError: Unsupported type in conversion to Arrow: NullType
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to