I'm still validating my results, but my solution for the moment looks like the below. I'm presently dealing with one-hot encoded values, so all the numbers in my array are 1:
def udfMaker(feature_len): return F.udf(lambda x: SparseVector(feature_len, sorted(x), [1.0]*len(x)), VectorUDT()) indexer = StringIndexer(inputCol='contentStrings',outputCol='indexedContent).fit(source_df) makeVec = udfMaker(len(indexer.labels)) indexed_data = indexer.transform(source_df) sparse_content = (indexed_data.groupBy('ID'). .agg(F.collect_set('indexedContent').alias('contentIdx')) .withColumn('content', makeVec(F.col('contentIdx'))) .drop('contentIdx') ) On Tue, Jun 12, 2018 at 3:59 PM, Nathan Kronenfeld < nkronenfeld@uncharted.software> wrote: > I don't know if this is the best way or not, but: > > val indexer = new StringIndexer().setInputCol("vr").setOutputCol("vrIdx") > val indexModel = indexer.fit(data) > val indexedData = indexModel.transform(data) > val variables = indexModel.labels.length > > val toSeq = udf((a: Double, b: Double) => Seq(a, b)) > val toVector = udf((seq: Seq[Seq[Double]]) => { > new SparseVector(variables, seq.map(_(0).toInt).toArray, > seq.map(_(1)).toArray) > }) > val result = indexedData > .withColumn("val", toSeq(col("vrIdx"), col("value"))) > .groupBy("ID") > .agg(collect_set(col("val")).name("collected_val")) > .withColumn("collected_val", > toVector(col("collected_val")).as[Row](Encoders.javaSerialization(classOf[Row]))) > > > at least works. The indices still aren't in order in the vector - I don't > know if this matters much, but if it does, it's easy enough to sort them in > toVector (and to remove duplicates) > > > On Tue, Jun 12, 2018 at 2:24 PM, Patrick McCarthy <pmccar...@dstillery.com > > wrote: > >> I work with a lot of data in a long format, cases in which an ID column >> is repeated, followed by a variable and a value column like so: >> >> +---+-----+-------+ >> |ID | var | value | >> +---+-----+-------+ >> | A | v1 | 1.0 | >> | A | v2 | 2.0 | >> | B | v1 | 1.5 | >> | B | v3 | -1.0 | >> +---+-----+-------+ >> >> It seems to me that Spark doesn't provide any clear native way to >> transform data of this format into a Vector() or VectorUDT() type suitable >> for machine learning algorithms. >> >> The best solution I've found so far (which isn't very good) is to group >> by ID, perform a collect_list, and then use a UDF to translate the >> resulting array into a vector datatype. >> >> I can get kind of close like so: >> >> indexer = MF.StringIndexer(inputCol = 'var', outputCol = 'varIdx') >> >> (indexed_df >> .withColumn('val',F.concat(F.col('varIdx').astype(T.IntegerType()).astype(T.StringType()), >> F.lit(':'),F.col('value'))) >> .groupBy('ID') >> .agg(F.collect_set('val')) >> ) >> >> But the resultant 'val' vector is out of index order, and still would >> need to be parsed. >> >> What's the current preferred way to solve a problem like this? >> > >