So I wrote some code to reproduce the problem.

I assume here that a pipeline should be able to transform a categorical feature 
with a few million levels.
So I create a dataframe with the categorical feature (‘id’), apply a 
StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
the amount of levels.
It breaks at 1.276.000 levels.

Shall I report this as a ticket in JIRA?

____________


from pyspark.sql.functions import rand
from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
from pyspark.ml import Pipeline

start_id = 100000
n = 5000000
step = (n - start_id) / 25

for i in xrange(start_id,start_id + n,step):
    print "#########\n {}".format(i)
    dfr = (sqlContext
           .range(start_id, start_id + i)
           .withColumn(‘label', rand(seed=10))
           .withColumn('feat2', rand(seed=101))
    #        .withColumn('normal', randn(seed=27))
           ).repartition(32).cache()
    # dfr.select("id", rand(seed=10).alias("uniform"), 
randn(seed=27).alias("normal")).show()
    dfr.show(1)
    print "This dataframe has {0} rows (and therefore {0} levels will be one 
hot encoded)".format(dfr.count())

    categorical_feature  = ['id'] 
    stages = []

    for c in categorical_feature:
        stages.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c)))
        stages.append(OneHotEncoder(dropLast= False, inputCol = 
"{}Index".format(c), outputCol = "{}OHE".format(c)))

    columns = ["{}OHE".format(x) for x in categorical_feature]
    columns.append('feat2')
    
    assembler = VectorAssembler(
        inputCols=columns,
        outputCol="features")
    stages.append(assembler)

    df2 = dfr

    pipeline = Pipeline(stages=stages)
    pipeline_fitted = pipeline.fit(df2)
    df3 = pipeline_fitted.transform(df2)
    df3.show(1)
    dfr.unpersist()


____________

Output:

#########
 100000
+------+---------------------------+-------------------+
|    id|label                      |              feat2|
+------+---------------------------+-------------------+
|183601|        0.38693226548356197|0.04485291680169634|
+------+---------------------------+-------------------+
only showing top 1 row

This dataframe has 100000 rows (and therefore 100000 levels will be one hot 
encoded)
+------+---------------------------+-------------------+-------+--------------------+--------------------+
|    id|label                      |              feat2|idIndex|               
idOHE|            features|
+------+---------------------------+-------------------+-------+--------------------+--------------------+
|183601|        
0.38693226548356197|0.04485291680169634|83240.0|(100000,[83240],[...|(100001,[83240,10...|
+------+---------------------------+-------------------+-------+--------------------+--------------------+
only showing top 1 row

#########
 296000
+------+---------------------------+-------------------+
|    id|label                      |              feat2|
+------+---------------------------+-------------------+
|137008|         0.2996020619810592|0.38693226548356197|
+------+---------------------------+-------------------+
only showing top 1 row

This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
encoded)
+------+---------------------------+-------------------+-------+--------------------+--------------------+
|    id|label                      |              feat2|idIndex|               
idOHE|            features|
+------+---------------------------+-------------------+-------+--------------------+--------------------+
|137008|         
0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
+------+---------------------------+-------------------+-------+--------------------+--------------------+
only showing top 1 row

#########
 492000
+------+---------------------------+-------------------+
|    id|label                      |              feat2|
+------+---------------------------+-------------------+
|534351|         0.9450641392552516|0.23472935141246665|
+------+---------------------------+-------------------+
only showing top 1 row

This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
encoded)
+------+---------------------------+-------------------+-------+--------------------+--------------------+
|    id|label                      |              feat2|idIndex|               
idOHE|            features|
+------+---------------------------+-------------------+-------+--------------------+--------------------+
|534351|         0.9450641392552516|0.23472935141246665| 
3656.0|(492000,[3656],[1...|(492001,[3656,492...|
+------+---------------------------+-------------------+-------+--------------------+--------------------+
only showing top 1 row

#########
 688000
+------+---------------------------+------------------+
|    id|label                      |             feat2|
+------+---------------------------+------------------+
|573008|         0.3059347083549171|0.4846147657830415|
+------+---------------------------+------------------+
only showing top 1 row

This dataframe has 688000 rows (and therefore 688000 levels will be one hot 
encoded)
+------+---------------------------+------------------+--------+--------------------+--------------------+
|    id|label                      |             feat2| idIndex|               
idOHE|            features|
+------+---------------------------+------------------+--------+--------------------+--------------------+
|573008|         
0.3059347083549171|0.4846147657830415|475855.0|(688000,[475855],...|(688001,[475855,6...|
+------+---------------------------+------------------+--------+--------------------+--------------------+
only showing top 1 row

#########
 884000
+------+---------------------------+------------------+
|    id|label                      |             feat2|
+------+---------------------------+------------------+
|970195|        0.34345290476989165|0.9843176058907069|
+------+---------------------------+------------------+
only showing top 1 row

This dataframe has 884000 rows (and therefore 884000 levels will be one hot 
encoded)
+------+---------------------------+------------------+--------+--------------------+--------------------+
|    id|label                      |             feat2| idIndex|               
idOHE|            features|
+------+---------------------------+------------------+--------+--------------------+--------------------+
|970195|        
0.34345290476989165|0.9843176058907069|333915.0|(884000,[333915],...|(884001,[333915,8...|
+------+---------------------------+------------------+--------+--------------------+--------------------+
only showing top 1 row

#########
 1080000
+------+---------------------------+-----------------+
|    id|label                      |            feat2|
+------+---------------------------+-----------------+
|403758|         0.6333344187975314|0.774327685753309|
+------+---------------------------+-----------------+
only showing top 1 row

This dataframe has 1080000 rows (and therefore 1080000 levels will be one hot 
encoded)
+------+---------------------------+-----------------+--------+--------------------+--------------------+
|    id|label                      |            feat2| idIndex|               
idOHE|            features|
+------+---------------------------+-----------------+--------+--------------------+--------------------+
|403758|         
0.6333344187975314|0.774327685753309|287898.0|(1080000,[287898]...|(1080001,[287898,...|
+------+---------------------------+-----------------+--------+--------------------+--------------------+
only showing top 1 row

#########
 1276000
+------+---------------------------+------------------+
|    id|label                      |             feat2|
+------+---------------------------+------------------+
|508726|         0.2513814327408137|0.8480577183702391|
+------+---------------------------+------------------+
only showing top 1 row

This dataframe has 1276000 rows (and therefore 1276000 levels will be one hot 
encoded)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-f5c9fe263872> in <module>()
     38     pipeline = Pipeline(stages=stages)
     39     pipeline_fitted = pipeline.fit(df2)
---> 40     df3 = pipeline_fitted.transform(df2)
     41     df3.show(1)
     42     dfr.unpersist()

/opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, params)
    103                 return self.copy(params)._transform(dataset)
    104             else:
--> 105                 return self._transform(dataset)
    106         else:
    107             raise ValueError("Params must be a param map but got %s." % 
type(params))

/opt/spark/2.0.0/python/pyspark/ml/pipeline.py in _transform(self, dataset)
    196     def _transform(self, dataset):
    197         for t in self.stages:
--> 198             dataset = t.transform(dataset)
    199         return dataset
    200 

/opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, params)
    103                 return self.copy(params)._transform(dataset)
    104             else:
--> 105                 return self._transform(dataset)
    106         else:
    107             raise ValueError("Params must be a param map but got %s." % 
type(params))

/opt/spark/2.0.0/python/pyspark/ml/wrapper.py in _transform(self, dataset)
    227     def _transform(self, dataset):
    228         self._transfer_params_to_java()
--> 229         return DataFrame(self._java_obj.transform(dataset._jdf), 
dataset.sql_ctx)
    230 
    231 

/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in 
__call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o408.transform.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at scala.collection.immutable.Stream$.from(Stream.scala:1262)
        at 
scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
        at 
scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
        at 
scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274)
        at 
scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277)
        at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202)
        at 
scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133)
        at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203)
        at 
scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66)
        at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197)
        at scala.collection.SeqLike$class.size(SeqLike.scala:106)
        at 
scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285)
        at 
scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
        at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
        at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
        at scala.Option.map(Option.scala:146)
        at 
org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70)
        at 
org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65)
        at 
org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234)
        at 
org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246)
        at 
org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)



Spark Properties        
Name    Value
spark.app.name  pyspark-shell
spark.driver.cores      1
spark.driver.extraJavaOptions   -XX:+UnlockDiagnosticVMOptions 
-XX:+PerfDisableSharedMem
spark.driver.memory     2g
spark.dynamicAllocation.enabled FALSE
spark.eventLog.dir      hdfs:///spark/history
spark.eventLog.enabled  TRUE
spark.executor.cores    1
spark.executor.extraJavaOptions -XX:+UnlockDiagnosticVMOptions 
-XX:+PerfDisableSharedMem
spark.executor.id       driver
spark.executor.instances        128
spark.executor.memory   2g
spark.history.fs.logDirectory   hdfs:///spark/history
spark.master    yarn-client
spark.memory.fraction   0.7
spark.memory.storageFraction    0.5
spark.rdd.compress      TRUE
spark.scheduler.mode    FIFO
spark.serializer.objectStreamReset      100
spark.shuffle.service.enabled   FALSE
spark.speculation       TRUE
spark.submit.deployMode client
spark.task.maxFailures  10
spark.yarn.executor.memoryOverhead      2048
spark.yarn.isPython     TRUE


> On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote:
> 
> Ok, interesting. Would be interested to see how it compares.
> 
> By the way, the feature size you select for the hasher should be a power of 2 
> (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes are 
> evenly distributed (see the section on HashingTF under 
> http://spark.apache.org/docs/latest/ml-features.html#tf-idf 
> <http://spark.apache.org/docs/latest/ml-features.html#tf-idf>).
> 
> On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com 
> <mailto:bteeu...@gmail.com>> wrote:
> Thanks Nick, I played around with the hashing trick. When I set numFeatures 
> to the amount of distinct values for the largest sparse feature, I ended up 
> with half of them colliding. When raising the numFeatures to have less 
> collisions, I soon ended up with the same memory problems as before. To be 
> honest, I didn’t test the impact of having more or less collisions on the 
> quality of the predictions, but tunnel visioned into getting it to work with 
> the full sparsity.
> 
> Before I worked in RDD land; zipWithIndex on rdd with distinct values + one 
> entry ‘missing’ for missing values during predict, collectAsMap, broadcast 
> the map, udf generating sparse vector, assembling the vectors manually). To 
> move into dataframe land, I wrote:
> 
> def getMappings(mode):
>     mappings = defaultdict(dict)
>     max_index = 0
>     for c in cat_int[:]:    # for every categorical variable
> 
>         logging.info <http://logging.info/>("starting with {}".format(c))
>         if mode == 'train':
>             grouped = (df2                  
>                 .groupBy(c).count().orderBy('count', ascending = False)  # 
> get counts, ordered from largest to smallest 
>                 .selectExpr("*", "1 as n")      # prepare for window function 
> summing up 1s before current row to create a RANK
>                 .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS 
> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS index".format(max_index))
>                 .drop('n') # drop the column with static 1 values used for 
> the cumulative sum
>                 )
>             logging.info <http://logging.info/>("Got {} 
> rows.".format(grouped.count()))
>             grouped.show()
>             logging.info <http://logging.info/>('getting max')
>             max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
> r.t).first()  # update the max index so next categorical feature starts with 
> it.
>             logging.info <http://logging.info/>("max_index has become: 
> {}".format(max_index))
>             logging.info <http://logging.info/>('adding missing value, so we 
> also train on this and prediction data missing it. ')
>             schema = grouped.schema
>             logging.info <http://logging.info/>(schema)
>             grouped = grouped.union(spark.createDataFrame([('missing', 0, 
> max_index + 1)], schema))  # add index for missing value for values during 
> predict that are unseen during training.
>             max_index += 1
>             saveto = "{}/{}".format(path, c)
>             logging.info <http://logging.info/>("Writing to: 
> {}".format(saveto))
>             grouped.write.parquet(saveto, mode = 'overwrite')
> 
>         elif mode == 'predict':
>             loadfrom = "{}/{}".format(path, c)
>             logging.info <http://logging.info/>("Reading from: 
> {}".format(loadfrom))
>             grouped = spark.read.parquet(loadfrom)
> 
>         logging.info <http://logging.info/>("Adding to dictionary")
>         mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: 
> (d[c], d['index'])).collectAsMap()  # build up dictionary to be broadcasted 
> later on, used for creating sparse vectors
>         max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
> r.t).first()
> 
>     logging.info <http://logging.info/>("Sanity check for indexes:")
>     for c in cat_int[:]:
>         logging.info <http://logging.info/>("{} min: {} max: {}".format(c, 
> min(mappings[c].values()), max(mappings[c].values())))   # some logging to 
> confirm the indexes.
>         logging.info <http://logging.info/>("Missing value = 
> {}".format(mappings[c]['missing']))
>     return max_index, mappings
> 
> I’d love to see the StringIndexer + OneHotEncoder transformers cope with 
> missing values during prediction; for now I’ll work with the hacked stuff 
> above :).
> (.. and I should compare the performance with using the hashing trick.)
> 
> Ben

Reply via email to