[ 
https://issues.apache.org/jira/browse/DATAFU-150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Russell Jurney updated DATAFU-150:
----------------------------------
    Description: 
I have created the following code in Python to one-hot encode multilabel data 
and would like to add it to DataFu:
{code:java}
questions_tags = filtered_lists.map(
    lambda x: Row(
        _Body=x[0],
        _Tags=x[1]
    )
).toDF()

questions_tags.show()

# Create indexes for each multilabel tag
enumerated_labels = [
    z for z in enumerate(
        sorted(
            remaining_tags_df.rdd
            .groupBy(lambda x: 1)
            .flatMap(lambda x: [y.tag for y in x[1]])
            .collect()
        )
    )
]
tag_index = {x: i for i, x in enumerated_labels}
index_tag = {i: x for i, x in enumerated_labels}# Explicitly free RAM

def one_hot_encode(tag_list, enumerated_labels):
    """PySpark can't one-hot-encode multilabel data, so we do it ourselves."""
    one_hot_row = []
    for i, label in enumerated_labels:
        if index_tag[i] in tag_list:
            one_hot_row.append(1)
        else:
            one_hot_row.append(0)
    assert(len(one_hot_row) == len(enumerated_labels))
    return one_hot_row

# Write the one-hot-encoded questions to S3 as a parquet file
one_hot_questions = questions_tags.rdd.map(
    lambda x: Row(
        _Body=x._Body,
        _Tags=one_hot_encode(x._Tags, enumerated_labels)
    )
)

# Create a DataFrame
schema = T.StructType([
    T.StructField("_Body", T.ArrayType(
        T.StringType()
    )),
    T.StructField("_Tags", T.ArrayType(
        T.IntegerType()
    ))
])
one_hot_df = spark.createDataFrame(
    one_hot_questions,
    schema
)
one_hot_df.show()
{code}

Which shows:

{code}
+--------------------+--------------------+
|               _Body|               _Tags|
+--------------------+--------------------+
|[Convert, Decimal...|[0, 0, 0, 0, 0, 0...|
|[Percentage, widt...|[0, 0, 0, 0, 0, 0...|
|[How, I, calculat...|[0, 1, 0, 0, 0, 0...|
|[Calculate, relat...|[0, 0, 0, 0, 0, 0...|
|[Determine, user,...|[0, 0, 0, 0, 0, 0...|
|[Difference, Math...|[0, 1, 0, 0, 0, 0...|
|[Filling, DataSet...|[0, 0, 1, 0, 0, 0...|
|[Binary, Data, My...|[0, 0, 0, 0, 0, 0...|
|[What, fastest, w...|[0, 0, 0, 0, 0, 0...|
|[Throw, error, My...|[0, 0, 0, 0, 0, 0...|
|[How, use, C, soc...|[0, 0, 0, 0, 0, 0...|
|[Unloading, ByteA...|[0, 0, 0, 0, 0, 0...|
|[Check, changes, ...|[0, 0, 0, 0, 0, 0...|
|[Reliable, timer,...|[0, 1, 0, 0, 0, 0...|
|[Best, way, allow...|[0, 0, 0, 0, 0, 0...|
|[Multiple, submit...|[0, 0, 0, 0, 0, 0...|
|[How, I, get, dis...|[0, 0, 1, 0, 0, 0...|
|[Paging, collecti...|[0, 0, 1, 0, 0, 0...|
|[How, I, add, exi...|[0, 0, 0, 0, 0, 0...|
|[Getting, Subclip...|[0, 0, 0, 0, 0, 0...|
+--------------------+--------------------+
{code}

 

  was:
I have created the following code in Python to one-hot encode multilabel data 
and would like to add it to DataFu:

{{
questions_tags = filtered_lists.map(lambda x: Row(_Body=x[0], 
_Tags=x[1])).toDF()

# One-hot-encode the multilabel tags
enumerated_labels = [
    z for z in enumerate(
        sorted(
            remaining_tags_df.rdd
            .groupBy(lambda x: 1)
            .flatMap(lambda x: [y.tag for y in x[1]])
            .collect()
        )
    )
]
tag_index = {x: i for i, x in enumerated_labels}
index_tag = {i: x for i, x in enumerated_labels}

def one_hot_encode(tag_list, enumerated_labels):
    """PySpark can't one-hot-encode multilabel data, so we do it ourselves."""

    one_hot_row = []
    for i, label in enumerated_labels:
        if index_tag[i] in tag_list:
            one_hot_row.append(1)
        else:
            one_hot_row.append(0)
    assert(len(one_hot_row) == len(enumerated_labels))
    return one_hot_row

# Write the one-hot-encoded questions to S3 as a parquet file
one_hot_questions = questions_tags.rdd.map(
    lambda x: Row(_Body=x._Body, _Tags=one_hot_encode(x._Tags, 
enumerated_labels))
)

# Create a DataFrame for persisting as Parquet format
schema = T.StructType([
    T.StructField("_Body", T.ArrayType(
        T.StringType()
    )),
    T.StructField("_Tags", T.ArrayType(
        T.IntegerType()
    ))
])

one_hot_df = spark.createDataFrame(
    one_hot_questions,
    schema
)
one_hot_df.show()
}}




> Add MultiLabelOneHotEncoder
> ---------------------------
>
>                 Key: DATAFU-150
>                 URL: https://issues.apache.org/jira/browse/DATAFU-150
>             Project: DataFu
>          Issue Type: Improvement
>            Reporter: Russell Jurney
>            Assignee: Russell Jurney
>            Priority: Major
>
> I have created the following code in Python to one-hot encode multilabel data 
> and would like to add it to DataFu:
> {code:java}
> questions_tags = filtered_lists.map(
>     lambda x: Row(
>         _Body=x[0],
>         _Tags=x[1]
>     )
> ).toDF()
> questions_tags.show()
> # Create indexes for each multilabel tag
> enumerated_labels = [
>     z for z in enumerate(
>         sorted(
>             remaining_tags_df.rdd
>             .groupBy(lambda x: 1)
>             .flatMap(lambda x: [y.tag for y in x[1]])
>             .collect()
>         )
>     )
> ]
> tag_index = {x: i for i, x in enumerated_labels}
> index_tag = {i: x for i, x in enumerated_labels}# Explicitly free RAM
> def one_hot_encode(tag_list, enumerated_labels):
>     """PySpark can't one-hot-encode multilabel data, so we do it ourselves."""
>     one_hot_row = []
>     for i, label in enumerated_labels:
>         if index_tag[i] in tag_list:
>             one_hot_row.append(1)
>         else:
>             one_hot_row.append(0)
>     assert(len(one_hot_row) == len(enumerated_labels))
>     return one_hot_row
> # Write the one-hot-encoded questions to S3 as a parquet file
> one_hot_questions = questions_tags.rdd.map(
>     lambda x: Row(
>         _Body=x._Body,
>         _Tags=one_hot_encode(x._Tags, enumerated_labels)
>     )
> )
> # Create a DataFrame
> schema = T.StructType([
>     T.StructField("_Body", T.ArrayType(
>         T.StringType()
>     )),
>     T.StructField("_Tags", T.ArrayType(
>         T.IntegerType()
>     ))
> ])
> one_hot_df = spark.createDataFrame(
>     one_hot_questions,
>     schema
> )
> one_hot_df.show()
> {code}
> Which shows:
> {code}
> +--------------------+--------------------+
> |               _Body|               _Tags|
> +--------------------+--------------------+
> |[Convert, Decimal...|[0, 0, 0, 0, 0, 0...|
> |[Percentage, widt...|[0, 0, 0, 0, 0, 0...|
> |[How, I, calculat...|[0, 1, 0, 0, 0, 0...|
> |[Calculate, relat...|[0, 0, 0, 0, 0, 0...|
> |[Determine, user,...|[0, 0, 0, 0, 0, 0...|
> |[Difference, Math...|[0, 1, 0, 0, 0, 0...|
> |[Filling, DataSet...|[0, 0, 1, 0, 0, 0...|
> |[Binary, Data, My...|[0, 0, 0, 0, 0, 0...|
> |[What, fastest, w...|[0, 0, 0, 0, 0, 0...|
> |[Throw, error, My...|[0, 0, 0, 0, 0, 0...|
> |[How, use, C, soc...|[0, 0, 0, 0, 0, 0...|
> |[Unloading, ByteA...|[0, 0, 0, 0, 0, 0...|
> |[Check, changes, ...|[0, 0, 0, 0, 0, 0...|
> |[Reliable, timer,...|[0, 1, 0, 0, 0, 0...|
> |[Best, way, allow...|[0, 0, 0, 0, 0, 0...|
> |[Multiple, submit...|[0, 0, 0, 0, 0, 0...|
> |[How, I, get, dis...|[0, 0, 1, 0, 0, 0...|
> |[Paging, collecti...|[0, 0, 1, 0, 0, 0...|
> |[How, I, add, exi...|[0, 0, 0, 0, 0, 0...|
> |[Getting, Subclip...|[0, 0, 0, 0, 0, 0...|
> +--------------------+--------------------+
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to