I'm still validating my results, but my solution for the moment looks like
the below. I'm presently dealing with one-hot encoded values, so all the
numbers in my array are 1:

def udfMaker(feature_len):

    return F.udf(lambda x: SparseVector(feature_len, sorted(x),
[1.0]*len(x)), VectorUDT())

indexer =
StringIndexer(inputCol='contentStrings',outputCol='indexedContent).fit(source_df)

makeVec = udfMaker(len(indexer.labels))

indexed_data = indexer.transform(source_df)

sparse_content = (indexed_data.groupBy('ID').
    .agg(F.collect_set('indexedContent').alias('contentIdx'))
    .withColumn('content', makeVec(F.col('contentIdx')))
    .drop('contentIdx')
)

On Tue, Jun 12, 2018 at 3:59 PM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

> I don't know if this is the best way or not, but:
>
> val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx")
> val indexModel = indexer.fit(data)
> val indexedData = indexModel.transform(data)
> val variables = indexModel.labels.length
>
> val toSeq = udf((a: Double, b: Double) => Seq(a, b))
> val toVector = udf((seq: Seq[Seq[Double]]) => {
>   new SparseVector(variables, seq.map(_(0).toInt).toArray, 
> seq.map(_(1)).toArray)
> })
> val result = indexedData
>   .withColumn("val", toSeq(col("vrIdx"), col("value")))
>   .groupBy("ID")
>   .agg(collect_set(col("val")).name("collected_val"))
>   .withColumn("collected_val", 
> toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row])))
>
>
> at least works.  The indices still aren't in order in the vector - I don't
> know if this matters much, but if it does, it's easy enough to sort them in
> toVector (and to remove duplicates)
>
>
> On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy <pmccar...@dstillery.com
> > wrote:
>
>> I work with a lot of data in a long format, cases in which an ID column
>> is repeated, followed by a variable and a value column like so:
>>
>> +---+-----+-------+
>> |ID | var | value |
>> +---+-----+-------+
>> | A | v1  | 1.0   |
>> | A | v2  | 2.0   |
>> | B | v1  | 1.5   |
>> | B | v3  | -1.0  |
>> +---+-----+-------+
>>
>> It seems to me that Spark doesn't provide any clear native way to
>> transform data of this format into a Vector() or VectorUDT() type suitable
>> for machine learning algorithms.
>>
>> The best solution I've found so far (which isn't very good) is to group
>> by ID, perform a collect_list, and then use a UDF to translate the
>> resulting array into a vector datatype.
>>
>> I can get kind of close like so:
>>
>> indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx')
>>
>> (indexed_df
>> .withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()),
>> F.lit(':'),F.col('value')))
>> .groupBy('ID')
>> .agg(F.collect_set('val'))
>> )
>>
>> But the resultant 'val' vector is out of index order, and still would
>> need to be parsed.
>>
>> What's the current preferred way to solve a problem like this?
>>
>
>

Reply via email to