[ https://issues.apache.org/jira/browse/DATAFU-150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Russell Jurney updated DATAFU-150: ---------------------------------- Description: I have created the following code in Python to one-hot encode multilabel data and would like to add it to DataFu: {code:java} questions_tags = filtered_lists.map( lambda x: Row( _Body=x[0], _Tags=x[1] ) ).toDF() questions_tags.show() # Create indexes for each multilabel tag enumerated_labels = [ z for z in enumerate( sorted( remaining_tags_df.rdd .groupBy(lambda x: 1) .flatMap(lambda x: [y.tag for y in x[1]]) .collect() ) ) ] tag_index = {x: i for i, x in enumerated_labels} index_tag = {i: x for i, x in enumerated_labels}# Explicitly free RAM def one_hot_encode(tag_list, enumerated_labels): """PySpark can't one-hot-encode multilabel data, so we do it ourselves.""" one_hot_row = [] for i, label in enumerated_labels: if index_tag[i] in tag_list: one_hot_row.append(1) else: one_hot_row.append(0) assert(len(one_hot_row) == len(enumerated_labels)) return one_hot_row # Write the one-hot-encoded questions to S3 as a parquet file one_hot_questions = questions_tags.rdd.map( lambda x: Row( _Body=x._Body, _Tags=one_hot_encode(x._Tags, enumerated_labels) ) ) # Create a DataFrame schema = T.StructType([ T.StructField("_Body", T.ArrayType( T.StringType() )), T.StructField("_Tags", T.ArrayType( T.IntegerType() )) ]) one_hot_df = spark.createDataFrame( one_hot_questions, schema ) one_hot_df.show() {code} Which shows: {code} +--------------------+--------------------+ | _Body| _Tags| +--------------------+--------------------+ |[Convert, Decimal...|[0, 0, 0, 0, 0, 0...| |[Percentage, widt...|[0, 0, 0, 0, 0, 0...| |[How, I, calculat...|[0, 1, 0, 0, 0, 0...| |[Calculate, relat...|[0, 0, 0, 0, 0, 0...| |[Determine, user,...|[0, 0, 0, 0, 0, 0...| |[Difference, Math...|[0, 1, 0, 0, 0, 0...| |[Filling, DataSet...|[0, 0, 1, 0, 0, 0...| |[Binary, Data, My...|[0, 0, 0, 0, 0, 0...| |[What, fastest, w...|[0, 0, 0, 0, 0, 0...| |[Throw, error, My...|[0, 0, 0, 0, 0, 0...| |[How, use, C, soc...|[0, 0, 0, 0, 0, 0...| |[Unloading, ByteA...|[0, 0, 0, 0, 0, 0...| |[Check, changes, ...|[0, 0, 0, 0, 0, 0...| |[Reliable, timer,...|[0, 1, 0, 0, 0, 0...| |[Best, way, allow...|[0, 0, 0, 0, 0, 0...| |[Multiple, submit...|[0, 0, 0, 0, 0, 0...| |[How, I, get, dis...|[0, 0, 1, 0, 0, 0...| |[Paging, collecti...|[0, 0, 1, 0, 0, 0...| |[How, I, add, exi...|[0, 0, 0, 0, 0, 0...| |[Getting, Subclip...|[0, 0, 0, 0, 0, 0...| +--------------------+--------------------+ {code} was: I have created the following code in Python to one-hot encode multilabel data and would like to add it to DataFu: {{ questions_tags = filtered_lists.map(lambda x: Row(_Body=x[0], _Tags=x[1])).toDF() # One-hot-encode the multilabel tags enumerated_labels = [ z for z in enumerate( sorted( remaining_tags_df.rdd .groupBy(lambda x: 1) .flatMap(lambda x: [y.tag for y in x[1]]) .collect() ) ) ] tag_index = {x: i for i, x in enumerated_labels} index_tag = {i: x for i, x in enumerated_labels} def one_hot_encode(tag_list, enumerated_labels): """PySpark can't one-hot-encode multilabel data, so we do it ourselves.""" one_hot_row = [] for i, label in enumerated_labels: if index_tag[i] in tag_list: one_hot_row.append(1) else: one_hot_row.append(0) assert(len(one_hot_row) == len(enumerated_labels)) return one_hot_row # Write the one-hot-encoded questions to S3 as a parquet file one_hot_questions = questions_tags.rdd.map( lambda x: Row(_Body=x._Body, _Tags=one_hot_encode(x._Tags, enumerated_labels)) ) # Create a DataFrame for persisting as Parquet format schema = T.StructType([ T.StructField("_Body", T.ArrayType( T.StringType() )), T.StructField("_Tags", T.ArrayType( T.IntegerType() )) ]) one_hot_df = spark.createDataFrame( one_hot_questions, schema ) one_hot_df.show() }} > Add MultiLabelOneHotEncoder > --------------------------- > > Key: DATAFU-150 > URL: https://issues.apache.org/jira/browse/DATAFU-150 > Project: DataFu > Issue Type: Improvement > Reporter: Russell Jurney > Assignee: Russell Jurney > Priority: Major > > I have created the following code in Python to one-hot encode multilabel data > and would like to add it to DataFu: > {code:java} > questions_tags = filtered_lists.map( > lambda x: Row( > _Body=x[0], > _Tags=x[1] > ) > ).toDF() > questions_tags.show() > # Create indexes for each multilabel tag > enumerated_labels = [ > z for z in enumerate( > sorted( > remaining_tags_df.rdd > .groupBy(lambda x: 1) > .flatMap(lambda x: [y.tag for y in x[1]]) > .collect() > ) > ) > ] > tag_index = {x: i for i, x in enumerated_labels} > index_tag = {i: x for i, x in enumerated_labels}# Explicitly free RAM > def one_hot_encode(tag_list, enumerated_labels): > """PySpark can't one-hot-encode multilabel data, so we do it ourselves.""" > one_hot_row = [] > for i, label in enumerated_labels: > if index_tag[i] in tag_list: > one_hot_row.append(1) > else: > one_hot_row.append(0) > assert(len(one_hot_row) == len(enumerated_labels)) > return one_hot_row > # Write the one-hot-encoded questions to S3 as a parquet file > one_hot_questions = questions_tags.rdd.map( > lambda x: Row( > _Body=x._Body, > _Tags=one_hot_encode(x._Tags, enumerated_labels) > ) > ) > # Create a DataFrame > schema = T.StructType([ > T.StructField("_Body", T.ArrayType( > T.StringType() > )), > T.StructField("_Tags", T.ArrayType( > T.IntegerType() > )) > ]) > one_hot_df = spark.createDataFrame( > one_hot_questions, > schema > ) > one_hot_df.show() > {code} > Which shows: > {code} > +--------------------+--------------------+ > | _Body| _Tags| > +--------------------+--------------------+ > |[Convert, Decimal...|[0, 0, 0, 0, 0, 0...| > |[Percentage, widt...|[0, 0, 0, 0, 0, 0...| > |[How, I, calculat...|[0, 1, 0, 0, 0, 0...| > |[Calculate, relat...|[0, 0, 0, 0, 0, 0...| > |[Determine, user,...|[0, 0, 0, 0, 0, 0...| > |[Difference, Math...|[0, 1, 0, 0, 0, 0...| > |[Filling, DataSet...|[0, 0, 1, 0, 0, 0...| > |[Binary, Data, My...|[0, 0, 0, 0, 0, 0...| > |[What, fastest, w...|[0, 0, 0, 0, 0, 0...| > |[Throw, error, My...|[0, 0, 0, 0, 0, 0...| > |[How, use, C, soc...|[0, 0, 0, 0, 0, 0...| > |[Unloading, ByteA...|[0, 0, 0, 0, 0, 0...| > |[Check, changes, ...|[0, 0, 0, 0, 0, 0...| > |[Reliable, timer,...|[0, 1, 0, 0, 0, 0...| > |[Best, way, allow...|[0, 0, 0, 0, 0, 0...| > |[Multiple, submit...|[0, 0, 0, 0, 0, 0...| > |[How, I, get, dis...|[0, 0, 1, 0, 0, 0...| > |[Paging, collecti...|[0, 0, 1, 0, 0, 0...| > |[How, I, add, exi...|[0, 0, 0, 0, 0, 0...| > |[Getting, Subclip...|[0, 0, 0, 0, 0, 0...| > +--------------------+--------------------+ > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)