I don’t think scaling RAM is a sane strategy to fixing these problems with using a dataframe / transformer approach to creating large sparse vectors.
One, though yes it will delay when it will fail, it will still fail. The original case I emailed about I tried this, and after waiting 50 minutes, it still broke. Second, if you don’t use dataframes / transformers, but write your own functions to do one hot encoding and creating sparse vectors, it will easily work on small boxes. E.g. build up a dictionary with unique index numbers for all unique values, and access these indexes when creating sparse vectors: def makeDict(df,columnName): dict = df.select(columnName).map(lambda x:unicode(x[0])).distinct().zipWithIndex().collectAsMap() dict["missing"] = len(dict) return dict def encodeOneHot(x, column): key = "missing" if unicode(x) in mappings_bc[column]: key = unicode(x) return Vectors.sparse(len(mappings_bc[column]),[mappings_bc[column][key]],[1.0]) Ben > On Aug 19, 2016, at 11:34 PM, Davies Liu <dav...@databricks.com> wrote: > > The OOM happen in driver, you may also need more memory for driver. > > On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu <dav...@databricks.com> wrote: >> You are using lots of tiny executors (128 executor with only 2G >> memory), could you try with bigger executor (for example 16G x 16)? >> >> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen <bteeu...@gmail.com> wrote: >>> >>> So I wrote some code to reproduce the problem. >>> >>> I assume here that a pipeline should be able to transform a categorical >>> feature with a few million levels. >>> So I create a dataframe with the categorical feature (‘id’), apply a >>> StringIndexer and OneHotEncoder transformer, and run a loop where I >>> increase the amount of levels. >>> It breaks at 1.276.000 levels. >>> >>> Shall I report this as a ticket in JIRA? >>> >>> ____________ >>> >>> >>> from pyspark.sql.functions import rand >>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler >>> from pyspark.ml import Pipeline >>> >>> start_id = 100000 >>> n = 5000000 >>> step = (n - start_id) / 25 >>> >>> for i in xrange(start_id,start_id + n,step): >>> print "#########\n {}".format(i) >>> dfr = (sqlContext >>> .range(start_id, start_id + i) >>> .withColumn(‘label', rand(seed=10)) >>> .withColumn('feat2', rand(seed=101)) >>> # .withColumn('normal', randn(seed=27)) >>> ).repartition(32).cache() >>> # dfr.select("id", rand(seed=10).alias("uniform"), >>> randn(seed=27).alias("normal")).show() >>> dfr.show(1) >>> print "This dataframe has {0} rows (and therefore {0} levels will be one >>> hot encoded)".format(dfr.count()) >>> >>> categorical_feature = ['id'] >>> stages = [] >>> >>> for c in categorical_feature: >>> stages.append(StringIndexer(inputCol=c, >>> outputCol="{}Index".format(c))) >>> stages.append(OneHotEncoder(dropLast= False, inputCol = >>> "{}Index".format(c), outputCol = "{}OHE".format(c))) >>> >>> columns = ["{}OHE".format(x) for x in categorical_feature] >>> columns.append('feat2') >>> >>> assembler = VectorAssembler( >>> inputCols=columns, >>> outputCol="features") >>> stages.append(assembler) >>> >>> df2 = dfr >>> >>> pipeline = Pipeline(stages=stages) >>> pipeline_fitted = pipeline.fit(df2) >>> df3 = pipeline_fitted.transform(df2) >>> df3.show(1) >>> dfr.unpersist() >>> >>> >>> ____________ >>> >>> Output: >>> >>> >>> ######### >>> 100000 >>> +------+---------------------------+-------------------+ >>> | id|label | feat2| >>> +------+---------------------------+-------------------+ >>> |183601| 0.38693226548356197|0.04485291680169634| >>> +------+---------------------------+-------------------+ >>> only showing top 1 row >>> >>> This dataframe has 100000 rows (and therefore 100000 levels will be one hot >>> encoded) >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> | id|label | feat2|idIndex| >>> idOHE| features| >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> |183601| >>> 0.38693226548356197|0.04485291680169634|83240.0|(100000,[83240],[...|(100001,[83240,10...| >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> only showing top 1 row >>> >>> ######### >>> 296000 >>> +------+---------------------------+-------------------+ >>> | id|label | feat2| >>> +------+---------------------------+-------------------+ >>> |137008| 0.2996020619810592|0.38693226548356197| >>> +------+---------------------------+-------------------+ >>> only showing top 1 row >>> >>> This dataframe has 296000 rows (and therefore 296000 levels will be one hot >>> encoded) >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> | id|label | feat2|idIndex| >>> idOHE| features| >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> |137008| >>> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...| >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> only showing top 1 row >>> >>> ######### >>> 492000 >>> +------+---------------------------+-------------------+ >>> | id|label | feat2| >>> +------+---------------------------+-------------------+ >>> |534351| 0.9450641392552516|0.23472935141246665| >>> +------+---------------------------+-------------------+ >>> only showing top 1 row >>> >>> This dataframe has 492000 rows (and therefore 492000 levels will be one hot >>> encoded) >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> | id|label | feat2|idIndex| >>> idOHE| features| >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> |534351| 0.9450641392552516|0.23472935141246665| >>> 3656.0|(492000,[3656],[1...|(492001,[3656,492...| >>> +------+---------------------------+-------------------+-------+--------------------+--------------------+ >>> only showing top 1 row >>> >>> ######### >>> 688000 >>> +------+---------------------------+------------------+ >>> | id|label | feat2| >>> +------+---------------------------+------------------+ >>> |573008| 0.3059347083549171|0.4846147657830415| >>> +------+---------------------------+------------------+ >>> only showing top 1 row >>> >>> This dataframe has 688000 rows (and therefore 688000 levels will be one hot >>> encoded) >>> +------+---------------------------+------------------+--------+--------------------+--------------------+ >>> | id|label | feat2| idIndex| >>> idOHE| features| >>> +------+---------------------------+------------------+--------+--------------------+--------------------+ >>> |573008| >>> 0.3059347083549171|0.4846147657830415|475855.0|(688000,[475855],...|(688001,[475855,6...| >>> +------+---------------------------+------------------+--------+--------------------+--------------------+ >>> only showing top 1 row >>> >>> ######### >>> 884000 >>> +------+---------------------------+------------------+ >>> | id|label | feat2| >>> +------+---------------------------+------------------+ >>> |970195| 0.34345290476989165|0.9843176058907069| >>> +------+---------------------------+------------------+ >>> only showing top 1 row >>> >>> This dataframe has 884000 rows (and therefore 884000 levels will be one hot >>> encoded) >>> +------+---------------------------+------------------+--------+--------------------+--------------------+ >>> | id|label | feat2| idIndex| >>> idOHE| features| >>> +------+---------------------------+------------------+--------+--------------------+--------------------+ >>> |970195| >>> 0.34345290476989165|0.9843176058907069|333915.0|(884000,[333915],...|(884001,[333915,8...| >>> +------+---------------------------+------------------+--------+--------------------+--------------------+ >>> only showing top 1 row >>> >>> ######### >>> 1080000 >>> +------+---------------------------+-----------------+ >>> | id|label | feat2| >>> +------+---------------------------+-----------------+ >>> |403758| 0.6333344187975314|0.774327685753309| >>> +------+---------------------------+-----------------+ >>> only showing top 1 row >>> >>> This dataframe has 1080000 rows (and therefore 1080000 levels will be one >>> hot encoded) >>> +------+---------------------------+-----------------+--------+--------------------+--------------------+ >>> | id|label | feat2| idIndex| >>> idOHE| features| >>> +------+---------------------------+-----------------+--------+--------------------+--------------------+ >>> |403758| >>> 0.6333344187975314|0.774327685753309|287898.0|(1080000,[287898]...|(1080001,[287898,...| >>> +------+---------------------------+-----------------+--------+--------------------+--------------------+ >>> only showing top 1 row >>> >>> ######### >>> 1276000 >>> +------+---------------------------+------------------+ >>> | id|label | feat2| >>> +------+---------------------------+------------------+ >>> |508726| 0.2513814327408137|0.8480577183702391| >>> +------+---------------------------+------------------+ >>> only showing top 1 row >>> >>> This dataframe has 1276000 rows (and therefore 1276000 levels will be one >>> hot encoded) >>> >>> --------------------------------------------------------------------------- >>> Py4JJavaError Traceback (most recent call last) >>> <ipython-input-2-f5c9fe263872> in <module>() >>> 38 pipeline = Pipeline(stages=stages) >>> 39 pipeline_fitted = pipeline.fit(df2) >>> ---> 40 df3 = pipeline_fitted.transform(df2) >>> 41 df3.show(1) >>> 42 dfr.unpersist() >>> >>> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, >>> params) >>> 103 return self.copy(params)._transform(dataset) >>> 104 else: >>> --> 105 return self._transform(dataset) >>> 106 else: >>> 107 raise ValueError("Params must be a param map but got >>> %s." % type(params)) >>> >>> /opt/spark/2.0.0/python/pyspark/ml/pipeline.py in _transform(self, dataset) >>> 196 def _transform(self, dataset): >>> 197 for t in self.stages: >>> --> 198 dataset = t.transform(dataset) >>> 199 return dataset >>> 200 >>> >>> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, >>> params) >>> 103 return self.copy(params)._transform(dataset) >>> 104 else: >>> --> 105 return self._transform(dataset) >>> 106 else: >>> 107 raise ValueError("Params must be a param map but got >>> %s." % type(params)) >>> >>> /opt/spark/2.0.0/python/pyspark/ml/wrapper.py in _transform(self, dataset) >>> 227 def _transform(self, dataset): >>> 228 self._transfer_params_to_java() >>> --> 229 return DataFrame(self._java_obj.transform(dataset._jdf), >>> dataset.sql_ctx) >>> 230 >>> 231 >>> >>> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in >>> __call__(self, *args) >>> 931 answer = self.gateway_client.send_command(command) >>> 932 return_value = get_return_value( >>> --> 933 answer, self.gateway_client, self.target_id, self.name) >>> 934 >>> 935 for temp_arg in temp_args: >>> >>> /opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw) >>> 61 def deco(*a, **kw): >>> 62 try: >>> ---> 63 return f(*a, **kw) >>> 64 except py4j.protocol.Py4JJavaError as e: >>> 65 s = e.java_exception.toString() >>> >>> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in >>> get_return_value(answer, gateway_client, target_id, name) >>> 310 raise Py4JJavaError( >>> 311 "An error occurred while calling {0}{1}{2}.\n". >>> --> 312 format(target_id, ".", name), value) >>> 313 else: >>> 314 raise Py4JError( >>> >>> Py4JJavaError: An error occurred while calling o408.transform. >>> : java.lang.OutOfMemoryError: GC overhead limit exceeded >>> at scala.collection.immutable.Stream$.from(Stream.scala:1262) >>> at >>> scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) >>> at >>> scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262) >>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) >>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) >>> at >>> scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274) >>> at >>> scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277) >>> at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202) >>> at scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133) >>> at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203) >>> at scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66) >>> at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197) >>> at scala.collection.SeqLike$class.size(SeqLike.scala:106) >>> at >>> scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37) >>> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285) >>> at >>> scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37) >>> at >>> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) >>> at >>> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72) >>> at scala.Option.map(Option.scala:146) >>> at >>> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70) >>> at >>> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65) >>> at >>> org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234) >>> at >>> org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246) >>> at >>> org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) >>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at py4j.Gateway.invoke(Gateway.java:280) >>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) >>> >>> >>> >>> >>> Spark Properties >>> NameValue >>> spark.app.namepyspark-shell >>> spark.driver.cores1 >>> spark.driver.extraJavaOptions-XX:+UnlockDiagnosticVMOptions >>> -XX:+PerfDisableSharedMem >>> spark.driver.memory2g >>> spark.dynamicAllocation.enabledFALSE >>> spark.eventLog.dirhdfs:///spark/history >>> spark.eventLog.enabledTRUE >>> spark.executor.cores1 >>> spark.executor.extraJavaOptions-XX:+UnlockDiagnosticVMOptions >>> -XX:+PerfDisableSharedMem >>> spark.executor.iddriver >>> spark.executor.instances128 >>> spark.executor.memory2g >>> spark.history.fs.logDirectoryhdfs:///spark/history >>> spark.masteryarn-client >>> spark.memory.fraction0.7 >>> spark.memory.storageFraction0.5 >>> spark.rdd.compressTRUE >>> spark.scheduler.modeFIFO >>> spark.serializer.objectStreamReset100 >>> spark.shuffle.service.enabledFALSE >>> spark.speculationTRUE >>> spark.submit.deployModeclient >>> spark.task.maxFailures10 >>> spark.yarn.executor.memoryOverhead2048 >>> spark.yarn.isPythonTRUE >>> >>> >>> On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com> >>> wrote: >>> >>> Ok, interesting. Would be interested to see how it compares. >>> >>> By the way, the feature size you select for the hasher should be a power of >>> 2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes >>> are evenly distributed (see the section on HashingTF under >>> http://spark.apache.org/docs/latest/ml-features.html#tf-idf). >>> >>> On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote: >>>> >>>> Thanks Nick, I played around with the hashing trick. When I set >>>> numFeatures to the amount of distinct values for the largest sparse >>>> feature, I ended up with half of them colliding. When raising the >>>> numFeatures to have less collisions, I soon ended up with the same memory >>>> problems as before. To be honest, I didn’t test the impact of having more >>>> or less collisions on the quality of the predictions, but tunnel visioned >>>> into getting it to work with the full sparsity. >>>> >>>> Before I worked in RDD land; zipWithIndex on rdd with distinct values + >>>> one entry ‘missing’ for missing values during predict, collectAsMap, >>>> broadcast the map, udf generating sparse vector, assembling the vectors >>>> manually). To move into dataframe land, I wrote: >>>> >>>> def getMappings(mode): >>>> mappings = defaultdict(dict) >>>> max_index = 0 >>>> for c in cat_int[:]: # for every categorical variable >>>> >>>> logging.info("starting with {}".format(c)) >>>> if mode == 'train': >>>> grouped = (df2 >>>> .groupBy(c).count().orderBy('count', ascending = False) # >>>> get counts, ordered from largest to smallest >>>> .selectExpr("*", "1 as n") # prepare for window >>>> function summing up 1s before current row to create a RANK >>>> .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS >>>> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS >>>> index".format(max_index)) >>>> .drop('n') # drop the column with static 1 values used for >>>> the cumulative sum >>>> ) >>>> logging.info("Got {} rows.".format(grouped.count())) >>>> grouped.show() >>>> logging.info('getting max') >>>> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda >>>> r: r.t).first() # update the max index so next categorical feature starts >>>> with it. >>>> logging.info("max_index has become: {}".format(max_index)) >>>> logging.info('adding missing value, so we also train on this >>>> and prediction data missing it. ') >>>> schema = grouped.schema >>>> logging.info(schema) >>>> grouped = grouped.union(spark.createDataFrame([('missing', 0, >>>> max_index + 1)], schema)) # add index for missing value for values during >>>> predict that are unseen during training. >>>> max_index += 1 >>>> saveto = "{}/{}".format(path, c) >>>> logging.info("Writing to: {}".format(saveto)) >>>> grouped.write.parquet(saveto, mode = 'overwrite') >>>> >>>> elif mode == 'predict': >>>> loadfrom = "{}/{}".format(path, c) >>>> logging.info("Reading from: {}".format(loadfrom)) >>>> grouped = spark.read.parquet(loadfrom) >>>> >>>> logging.info("Adding to dictionary") >>>> mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: >>>> (d[c], d['index'])).collectAsMap() # build up dictionary to be >>>> broadcasted later on, used for creating sparse vectors >>>> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: >>>> r.t).first() >>>> >>>> logging.info("Sanity check for indexes:") >>>> for c in cat_int[:]: >>>> logging.info("{} min: {} max: {}".format(c, >>>> min(mappings[c].values()), max(mappings[c].values()))) # some logging to >>>> confirm the indexes. >>>> logging.info("Missing value = {}".format(mappings[c]['missing'])) >>>> return max_index, mappings >>>> >>>> I’d love to see the StringIndexer + OneHotEncoder transformers cope with >>>> missing values during prediction; for now I’ll work with the hacked stuff >>>> above :). >>>> (.. and I should compare the performance with using the hashing trick.) >>>> >>>> Ben >>> >>> --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org