So I wrote some code to reproduce the problem. I assume here that a pipeline should be able to transform a categorical feature with a few million levels. So I create a dataframe with the categorical feature (‘id’), apply a StringIndexer and OneHotEncoder transformer, and run a loop where I increase the amount of levels. It breaks at 1.276.000 levels.
Shall I report this as a ticket in JIRA? ____________ from pyspark.sql.functions import rand from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler from pyspark.ml import Pipeline start_id = 100000 n = 5000000 step = (n - start_id) / 25 for i in xrange(start_id,start_id + n,step): print "#########\n {}".format(i) dfr = (sqlContext .range(start_id, start_id + i) .withColumn(‘label', rand(seed=10)) .withColumn('feat2', rand(seed=101)) # .withColumn('normal', randn(seed=27)) ).repartition(32).cache() # dfr.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).show() dfr.show(1) print "This dataframe has {0} rows (and therefore {0} levels will be one hot encoded)".format(dfr.count()) categorical_feature = ['id'] stages = [] for c in categorical_feature: stages.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c))) stages.append(OneHotEncoder(dropLast= False, inputCol = "{}Index".format(c), outputCol = "{}OHE".format(c))) columns = ["{}OHE".format(x) for x in categorical_feature] columns.append('feat2') assembler = VectorAssembler( inputCols=columns, outputCol="features") stages.append(assembler) df2 = dfr pipeline = Pipeline(stages=stages) pipeline_fitted = pipeline.fit(df2) df3 = pipeline_fitted.transform(df2) df3.show(1) dfr.unpersist() ____________ Output: ######### 100000 +------+---------------------------+-------------------+ | id|label | feat2| +------+---------------------------+-------------------+ |183601| 0.38693226548356197|0.04485291680169634| +------+---------------------------+-------------------+ only showing top 1 row This dataframe has 100000 rows (and therefore 100000 levels will be one hot encoded) +------+---------------------------+-------------------+-------+--------------------+--------------------+ | id|label | feat2|idIndex| idOHE| features| +------+---------------------------+-------------------+-------+--------------------+--------------------+ |183601| 0.38693226548356197|0.04485291680169634|83240.0|(100000,[83240],[...|(100001,[83240,10...| +------+---------------------------+-------------------+-------+--------------------+--------------------+ only showing top 1 row ######### 296000 +------+---------------------------+-------------------+ | id|label | feat2| +------+---------------------------+-------------------+ |137008| 0.2996020619810592|0.38693226548356197| +------+---------------------------+-------------------+ only showing top 1 row This dataframe has 296000 rows (and therefore 296000 levels will be one hot encoded) +------+---------------------------+-------------------+-------+--------------------+--------------------+ | id|label | feat2|idIndex| idOHE| features| +------+---------------------------+-------------------+-------+--------------------+--------------------+ |137008| 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...| +------+---------------------------+-------------------+-------+--------------------+--------------------+ only showing top 1 row ######### 492000 +------+---------------------------+-------------------+ | id|label | feat2| +------+---------------------------+-------------------+ |534351| 0.9450641392552516|0.23472935141246665| +------+---------------------------+-------------------+ only showing top 1 row This dataframe has 492000 rows (and therefore 492000 levels will be one hot encoded) +------+---------------------------+-------------------+-------+--------------------+--------------------+ | id|label | feat2|idIndex| idOHE| features| +------+---------------------------+-------------------+-------+--------------------+--------------------+ |534351| 0.9450641392552516|0.23472935141246665| 3656.0|(492000,[3656],[1...|(492001,[3656,492...| +------+---------------------------+-------------------+-------+--------------------+--------------------+ only showing top 1 row ######### 688000 +------+---------------------------+------------------+ | id|label | feat2| +------+---------------------------+------------------+ |573008| 0.3059347083549171|0.4846147657830415| +------+---------------------------+------------------+ only showing top 1 row This dataframe has 688000 rows (and therefore 688000 levels will be one hot encoded) +------+---------------------------+------------------+--------+--------------------+--------------------+ | id|label | feat2| idIndex| idOHE| features| +------+---------------------------+------------------+--------+--------------------+--------------------+ |573008| 0.3059347083549171|0.4846147657830415|475855.0|(688000,[475855],...|(688001,[475855,6...| +------+---------------------------+------------------+--------+--------------------+--------------------+ only showing top 1 row ######### 884000 +------+---------------------------+------------------+ | id|label | feat2| +------+---------------------------+------------------+ |970195| 0.34345290476989165|0.9843176058907069| +------+---------------------------+------------------+ only showing top 1 row This dataframe has 884000 rows (and therefore 884000 levels will be one hot encoded) +------+---------------------------+------------------+--------+--------------------+--------------------+ | id|label | feat2| idIndex| idOHE| features| +------+---------------------------+------------------+--------+--------------------+--------------------+ |970195| 0.34345290476989165|0.9843176058907069|333915.0|(884000,[333915],...|(884001,[333915,8...| +------+---------------------------+------------------+--------+--------------------+--------------------+ only showing top 1 row ######### 1080000 +------+---------------------------+-----------------+ | id|label | feat2| +------+---------------------------+-----------------+ |403758| 0.6333344187975314|0.774327685753309| +------+---------------------------+-----------------+ only showing top 1 row This dataframe has 1080000 rows (and therefore 1080000 levels will be one hot encoded) +------+---------------------------+-----------------+--------+--------------------+--------------------+ | id|label | feat2| idIndex| idOHE| features| +------+---------------------------+-----------------+--------+--------------------+--------------------+ |403758| 0.6333344187975314|0.774327685753309|287898.0|(1080000,[287898]...|(1080001,[287898,...| +------+---------------------------+-----------------+--------+--------------------+--------------------+ only showing top 1 row ######### 1276000 +------+---------------------------+------------------+ | id|label | feat2| +------+---------------------------+------------------+ |508726| 0.2513814327408137|0.8480577183702391| +------+---------------------------+------------------+ only showing top 1 row This dataframe has 1276000 rows (and therefore 1276000 levels will be one hot encoded) --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-2-f5c9fe263872> in <module>() 38 pipeline = Pipeline(stages=stages) 39 pipeline_fitted = pipeline.fit(df2) ---> 40 df3 = pipeline_fitted.transform(df2) 41 df3.show(1) 42 dfr.unpersist() /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, params) 103 return self.copy(params)._transform(dataset) 104 else: --> 105 return self._transform(dataset) 106 else: 107 raise ValueError("Params must be a param map but got %s." % type(params)) /opt/spark/2.0.0/python/pyspark/ml/pipeline.py in _transform(self, dataset) 196 def _transform(self, dataset): 197 for t in self.stages: --> 198 dataset = t.transform(dataset) 199 return dataset 200 /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, params) 103 return self.copy(params)._transform(dataset) 104 else: --> 105 return self._transform(dataset) 106 else: 107 raise ValueError("Params must be a param map but got %s." % type(params)) /opt/spark/2.0.0/python/pyspark/ml/wrapper.py in _transform(self, dataset) 227 def _transform(self, dataset): 228 self._transfer_params_to_java() --> 229 return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx) 230 231 /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 931 answer = self.gateway_client.send_command(command) 932 return_value = get_return_value( --> 933 answer, self.gateway_client, self.target_id, self.name) 934 935 for temp_arg in temp_args: /opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 310 raise Py4JJavaError( 311 "An error occurred while calling {0}{1}{2}.\n". --> 312 format(target_id, ".", name), value) 313 else: 314 raise Py4JError( Py4JJavaError: An error occurred while calling o408.transform. : java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.Stream$.from(Stream.scala:1262) at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) at scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274) at scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277) at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202) at scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133) at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203) at scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66) at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197) at scala.collection.SeqLike$class.size(SeqLike.scala:106) at scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285) at scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37) at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) at scala.Option.map(Option.scala:146) at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70) at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65) at org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234) at org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246) at org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) Spark Properties Name Value spark.app.name pyspark-shell spark.driver.cores 1 spark.driver.extraJavaOptions -XX:+UnlockDiagnosticVMOptions -XX:+PerfDisableSharedMem spark.driver.memory 2g spark.dynamicAllocation.enabled FALSE spark.eventLog.dir hdfs:///spark/history spark.eventLog.enabled TRUE spark.executor.cores 1 spark.executor.extraJavaOptions -XX:+UnlockDiagnosticVMOptions -XX:+PerfDisableSharedMem spark.executor.id driver spark.executor.instances 128 spark.executor.memory 2g spark.history.fs.logDirectory hdfs:///spark/history spark.master yarn-client spark.memory.fraction 0.7 spark.memory.storageFraction 0.5 spark.rdd.compress TRUE spark.scheduler.mode FIFO spark.serializer.objectStreamReset 100 spark.shuffle.service.enabled FALSE spark.speculation TRUE spark.submit.deployMode client spark.task.maxFailures 10 spark.yarn.executor.memoryOverhead 2048 spark.yarn.isPython TRUE > On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > > Ok, interesting. Would be interested to see how it compares. > > By the way, the feature size you select for the hasher should be a power of 2 > (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes are > evenly distributed (see the section on HashingTF under > http://spark.apache.org/docs/latest/ml-features.html#tf-idf > <http://spark.apache.org/docs/latest/ml-features.html#tf-idf>). > > On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com > <mailto:bteeu...@gmail.com>> wrote: > Thanks Nick, I played around with the hashing trick. When I set numFeatures > to the amount of distinct values for the largest sparse feature, I ended up > with half of them colliding. When raising the numFeatures to have less > collisions, I soon ended up with the same memory problems as before. To be > honest, I didn’t test the impact of having more or less collisions on the > quality of the predictions, but tunnel visioned into getting it to work with > the full sparsity. > > Before I worked in RDD land; zipWithIndex on rdd with distinct values + one > entry ‘missing’ for missing values during predict, collectAsMap, broadcast > the map, udf generating sparse vector, assembling the vectors manually). To > move into dataframe land, I wrote: > > def getMappings(mode): > mappings = defaultdict(dict) > max_index = 0 > for c in cat_int[:]: # for every categorical variable > > logging.info <http://logging.info/>("starting with {}".format(c)) > if mode == 'train': > grouped = (df2 > .groupBy(c).count().orderBy('count', ascending = False) # > get counts, ordered from largest to smallest > .selectExpr("*", "1 as n") # prepare for window function > summing up 1s before current row to create a RANK > .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS > BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS index".format(max_index)) > .drop('n') # drop the column with static 1 values used for > the cumulative sum > ) > logging.info <http://logging.info/>("Got {} > rows.".format(grouped.count())) > grouped.show() > logging.info <http://logging.info/>('getting max') > max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: > r.t).first() # update the max index so next categorical feature starts with > it. > logging.info <http://logging.info/>("max_index has become: > {}".format(max_index)) > logging.info <http://logging.info/>('adding missing value, so we > also train on this and prediction data missing it. ') > schema = grouped.schema > logging.info <http://logging.info/>(schema) > grouped = grouped.union(spark.createDataFrame([('missing', 0, > max_index + 1)], schema)) # add index for missing value for values during > predict that are unseen during training. > max_index += 1 > saveto = "{}/{}".format(path, c) > logging.info <http://logging.info/>("Writing to: > {}".format(saveto)) > grouped.write.parquet(saveto, mode = 'overwrite') > > elif mode == 'predict': > loadfrom = "{}/{}".format(path, c) > logging.info <http://logging.info/>("Reading from: > {}".format(loadfrom)) > grouped = spark.read.parquet(loadfrom) > > logging.info <http://logging.info/>("Adding to dictionary") > mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: > (d[c], d['index'])).collectAsMap() # build up dictionary to be broadcasted > later on, used for creating sparse vectors > max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: > r.t).first() > > logging.info <http://logging.info/>("Sanity check for indexes:") > for c in cat_int[:]: > logging.info <http://logging.info/>("{} min: {} max: {}".format(c, > min(mappings[c].values()), max(mappings[c].values()))) # some logging to > confirm the indexes. > logging.info <http://logging.info/>("Missing value = > {}".format(mappings[c]['missing'])) > return max_index, mappings > > I’d love to see the StringIndexer + OneHotEncoder transformers cope with > missing values during prediction; for now I’ll work with the hacked stuff > above :). > (.. and I should compare the performance with using the hashing trick.) > > Ben