Ok, interesting. Would be interested to see how it compares. By the way, the feature size you select for the hasher should be a power of 2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes are evenly distributed (see the section on HashingTF under http://spark.apache.org/docs/latest/ml-features.html#tf-idf).
On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote: > Thanks Nick, I played around with the hashing trick. When I set > numFeatures to the amount of distinct values for the largest sparse > feature, I ended up with half of them colliding. When raising the > numFeatures to have less collisions, I soon ended up with the same memory > problems as before. To be honest, I didn’t test the impact of having more > or less collisions on the quality of the predictions, but tunnel visioned > into getting it to work with the full sparsity. > > Before I worked in RDD land; zipWithIndex on rdd with distinct values + > one entry ‘missing’ for missing values during predict, collectAsMap, > broadcast the map, udf generating sparse vector, assembling the vectors > manually). To move into dataframe land, I wrote: > > def getMappings(mode): > mappings = defaultdict(dict) > max_index = 0 > for c in cat_int[:]: # for every categorical variable > > logging.info("starting with {}".format(c)) > if mode == 'train': > grouped = (df2 > .groupBy(c).count().orderBy('count', ascending = False) # > get counts, ordered from largest to smallest > .selectExpr("*", "1 as n") # prepare for window > function summing up 1s before current row to create a RANK > .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS > BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS > index".format(max_index)) > .drop('n') # drop the column with static 1 values used for > the cumulative sum > ) > logging.info("Got {} rows.".format(grouped.count())) > grouped.show() > logging.info('getting max') > max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda > r: r.t).first() # update the max index so next categorical feature starts > with it. > logging.info("max_index has become: {}".format(max_index)) > logging.info('adding missing value, so we also train on this > and prediction data missing it. ') > schema = grouped.schema > logging.info(schema) > grouped = grouped.union(spark.createDataFrame([('missing', 0, > max_index + 1)], schema)) # add index for missing value for values during > predict that are unseen during training. > max_index += 1 > saveto = "{}/{}".format(path, c) > logging.info("Writing to: {}".format(saveto)) > grouped.write.parquet(saveto, mode = 'overwrite') > > elif mode == 'predict': > loadfrom = "{}/{}".format(path, c) > logging.info("Reading from: {}".format(loadfrom)) > grouped = spark.read.parquet(loadfrom) > > logging.info("Adding to dictionary") > mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: > (d[c], d['index'])).collectAsMap() # build up dictionary to be broadcasted > later on, used for creating sparse vectors > max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: > r.t).first() > > logging.info("Sanity check for indexes:") > for c in cat_int[:]: > logging.info("{} min: {} max: {}".format(c, > min(mappings[c].values()), max(mappings[c].values()))) # some logging to > confirm the indexes. > logging.info("Missing value = {}".format(mappings[c]['missing'])) > return max_index, mappings > > I’d love to see the StringIndexer + OneHotEncoder transformers cope with > missing values during prediction; for now I’ll work with the hacked stuff > above :). > (.. and I should compare the performance with using the hashing trick.) > > Ben > > > On Aug 4, 2016, at 3:44 PM, Nick Pentreath <nick.pentre...@gmail.com> > wrote: > > Sure, I understand there are some issues with handling this missing value > situation in StringIndexer currently. Your workaround is not ideal but I > see that it is probably the only mechanism available currently to avoid the > problem. > > But the OOM issues seem to be more about the feature cardinality (so the > size of the hashmap to store the feature <-> index mappings). > > A nice property of feature hashing is that it implicitly handles unseen > category labels by setting the coefficient value to 0 (in the absence of a > hash collision) - basically option 2 from H2O. > > Why is that? Well once you've trained your model you have a (sparse) > N-dimensional weight vector that will be definition have 0s for unseen > indexes. At test time, any feature that only appears in your test set or > new data will be hashed to an index in the weight vector that has value 0. > > So it could be useful for both of your problems. > > On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen <bteeu...@gmail.com> wrote: > >> Hi Nick, >> >> Thanks for the suggestion. Reducing the dimensionality is an option, >> thanks, but let’s say I really want to do this :). >> >> The reason why it’s so big is that I’m unifying my training and test >> data, and I don’t want to drop rows in the test data just because one of >> the features was missing in the training data. I wouldn’t need this >> workaround, if I had a better *strategy in Spark for dealing with >> missing levels. *How Spark can deal with it: >> >> >> *"Additionally, there are two strategies regarding how StringIndexer will >> handle unseen labels when you have fit aStringIndexer on one dataset and >> then use it to transform another:* >> >> * • throw an exception (which is the default)* >> * • skip the row containing the unseen label entirely"* >> http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer >> >> I like how *H2O* handles this; >> >> *"What happens during prediction if the new sample has categorical levels >> not seen in training? The value will be filled with either special >> missing level (if trained with missing values and missing_value_handling >> was set to MeanImputation) or 0.”* >> >> https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md >> >> So assuming I need to unify the data, make it huge, and trying out more >> in scala, I see *these kinds of errors*: >> _____________ >> >> scala> feedBack(s"Applying string indexers: fitting") >> 2016-08-04 10:13:20() | Applying string indexers: fitting >> >> scala> val pipelined = new Pipeline().setStages(stagesIndex.toArray) >> pipelined: org.apache.spark.ml.Pipeline = pipeline_83be3b554e3a >> >> scala> val dfFitted = pipelined.fit(df) >> dfFitted: org.apache.spark.ml.PipelineModel = pipeline_83be3b554e3a >> >> scala> feedBack(s"Applying string indexers: transforming") >> 2016-08-04 10:17:29() | Applying string indexers: transforming >> >> scala> var df2 = dfFitted.transform(df) >> df2: org.apache.spark.sql.DataFrame = [myid: string, feature1: int ... 16 >> more fields] >> >> scala> >> >> scala> feedBack(s"Applying OHE: fitting") >> 2016-08-04 10:18:07() | Applying OHE: fitting >> >> scala> val pipelined2 = new Pipeline().setStages(stagesOhe.toArray) >> pipelined2: org.apache.spark.ml.Pipeline = pipeline_ba7922a29322 >> >> scala> val dfFitted2 = pipelined2.fit(df2) >> 16/08/04 10:21:41 WARN DFSClient: Slow ReadProcessor read fields took >> 85735ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR >> downstreamAckTimeNanos: 0, targets: [10.10.66.13:50010, 10.10.95.11:50010, >> 10.10.95.29:50010] >> 16/08/04 10:21:41 WARN DFSClient: DFSOutputStream ResponseProcessor >> exception for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 >> java.io.IOException: Bad response ERROR for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 from >> datanode 10.10.95.11:50010 >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897) >> 16/08/04 10:21:41 WARN DFSClient: Error Recovery for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 in >> pipeline 10.10.66.13:50010, 10.10.95.11:50010, 10.10.95.29:50010: bad >> datanode 10.10.95.11:50010 >> dfFitted2: org.apache.spark.ml.PipelineModel = pipeline_ba7922a29322 >> >> scala> feedBack(s"Applying OHE: transforming") >> 2016-08-04 10:29:12() | Applying OHE: transforming >> >> scala> df2 = dfFitted2.transform(df2).cache() >> 16/08/04 10:34:18 WARN DFSClient: DFSOutputStream ResponseProcessor >> exception for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 >> java.io.EOFException: Premature EOF: no length prefix available >> at >> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203) >> at >> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867) >> 16/08/04 10:34:18 WARN DFSClient: Error Recovery for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 in >> pipeline 10.10.66.13:50010, 10.10.66.3:50010, 10.10.95.29:50010: bad >> datanode 10.10.66.13:50010 >> 16/08/04 10:36:03 WARN DFSClient: Slow ReadProcessor read fields took >> 74146ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: SUCCESS >> status: ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.3:50010, >> 10.10.66.1:50010, 10.10.95.29:50010] >> 16/08/04 10:36:03 WARN DFSClient: DFSOutputStream ResponseProcessor >> exception for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 >> java.io.IOException: Bad response ERROR for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 from >> datanode 10.10.95.29:50010 >> at >> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897) >> 16/08/04 10:36:03 WARN DFSClient: Error Recovery for block >> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 in >> pipeline 10.10.66.3:50010, 10.10.66.1:50010, 10.10.95.29:50010: bad >> datanode 10.10.95.29:50010 >> 16/08/04 10:40:48 WARN DFSClient: Slow ReadProcessor read fields took >> 60891ms (threshold=30000ms); ack: seqno: -2 status: >> >> ____________________ >> >> After 40 minutes or so, with no activity in the application master, it >> dies. >> >> Ben >> >> On Aug 4, 2016, at 12:14 PM, Nick Pentreath <nick.pentre...@gmail.com> >> wrote: >> >> Hi Ben >> >> Perhaps with this size cardinality it is worth looking at feature hashing >> for your problem. Spark has the HashingTF transformer that works on a >> column of "sentences" (i.e. [string]). >> >> For categorical features you can hack it a little by converting your >> feature value into a ["feature_name=feature_value"] representation. Then >> HashingTF can be used as is. Note you can also just do ["feature_value"], >> but the former would allow you, with a bit of munging, to hash all your >> feature columns at the same time. >> >> The advantage is speed and bounded memory footprint. The disadvantages >> include (i) no way to reverse the mapping from feature_index -> >> feature_name; (ii) potential for hash collisions (can be helped a bit by >> increasing your feature vector size). >> >> Here is a minimal example: >> >> In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder, >> HashingTF >> In [2]: from pyspark.sql.types import StringType, ArrayType >> In [3]: from pyspark.sql.functions import udf >> >> In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"), >> (3, "baz")], ["id", "feature"]) >> >> In [5]: to_array = udf(lambda s: ["feature=%s" % s], >> ArrayType(StringType())) >> >> In [6]: df = df.withColumn("features", to_array("feature")) >> >> In [7]: df.show() >> +---+-------+-------------+ >> | id|feature| features| >> +---+-------+-------------+ >> | 0| foo|[feature=foo]| >> | 1| bar|[feature=bar]| >> | 2| foo|[feature=foo]| >> | 3| baz|[feature=baz]| >> +---+-------+-------------+ >> >> In [8]: indexer = StringIndexer(inputCol="feature", >> outputCol="feature_index") >> >> In [9]: indexed = indexer.fit(df).transform(df) >> >> In [10]: encoder = OneHotEncoder(dropLast=False, >> inputCol="feature_index", outputCol="feature_vector") >> >> In [11]: encoded = encoder.transform(indexed) >> >> In [12]: encoded.show() >> +---+-------+-------------+-------------+--------------+ >> | id|feature| features|feature_index|feature_vector| >> +---+-------+-------------+-------------+--------------+ >> | 0| foo|[feature=foo]| 0.0| (3,[0],[1.0])| >> | 1| bar|[feature=bar]| 2.0| (3,[2],[1.0])| >> | 2| foo|[feature=foo]| 0.0| (3,[0],[1.0])| >> | 3| baz|[feature=baz]| 1.0| (3,[1],[1.0])| >> +---+-------+-------------+-------------+--------------+ >> >> In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features", >> outputCol="features_vector") >> >> In [23]: hashed = hasher.transform(df) >> >> In [24]: hashed.show() >> +---+-------+-------------+-----------------+ >> | id|feature| features| features_vector| >> +---+-------+-------------+-----------------+ >> | 0| foo|[feature=foo]| (256,[59],[1.0])| >> | 1| bar|[feature=bar]|(256,[219],[1.0])| >> | 2| foo|[feature=foo]| (256,[59],[1.0])| >> | 3| baz|[feature=baz]| (256,[38],[1.0])| >> +---+-------+-------------+-----------------+ >> >> On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen <bteeu...@gmail.com> wrote: >> >>> I raised driver memory to 30G and maxresultsize to 25G, this time in >>> pyspark. >>> >>> *Code run:* >>> >>> cat_int = ['bigfeature'] >>> >>> stagesIndex = [] >>> stagesOhe = [] >>> for c in cat_int: >>> stagesIndex.append(StringIndexer(inputCol=c, >>> outputCol="{}Index".format(c))) >>> stagesOhe.append(OneHotEncoder(dropLast= False, inputCol = >>> "{}Index".format(c), outputCol = "{}OHE".format(c))) >>> >>> df2 = df >>> >>> for i in range(len(stagesIndex)): >>> logging.info("Starting with {}".format(cat_int[i])) >>> stagesIndex[i].fit(df2) >>> logging.info("Fitted. Now transforming:") >>> df2 = stagesIndex[i].fit(df2).transform(df2) >>> logging.info("Transformed. Now showing transformed:") >>> df2.show() >>> logging.info("OHE") >>> df2 = stagesOhe[i].transform(df2) >>> logging.info("Fitted. Now showing OHE:") >>> df2.show() >>> >>> *Now I get error:* >>> >>> 2016-08-04 08:53:44,839 INFO Starting with bigfeature >>> [57/7074] >>> ukStringIndexer_442b8e11e3294de9b83a >>> 2016-08-04 09:06:18,147 INFO Fitted. Now transforming: >>> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 - >>> Cannot receive any reply in 120 seconds. This timeout is controlled by >>> spark.rpc.askTimeout >>> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in >>> 120 seconds. This timeout is controlled by spark.rpc.askTimeout >>> at org.apache.spark.rpc.RpcTimeout.org >>> <http://org.apache.spark.rpc.rpctimeout.org/> >>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) >>> at >>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) >>> at >>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) >>> at >>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>> at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) >>> at scala.util.Try$.apply(Try.scala:192) >>> at scala.util.Failure.recover(Try.scala:216) >>> at >>> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) >>> at >>> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>> at >>> org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) >>> at >>> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136) >>> at >>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >>> at scala.concurrent.Promise$class.complete(Promise.scala:55) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) >>> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) >>> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>> at >>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63) >>> at >>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78) >>> at >>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) >>> at >>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) >>> at >>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>> at >>> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54) >>> at >>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) >>> at >>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106) >>> at >>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) >>> at >>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >>> at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) >>> at org.apache.spark.rpc.netty.NettyRpcEnv.org >>> <http://org.apache.spark.rpc.netty.nettyrpcenv.org/> >>> $apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205) >>> at >>> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> >>> [13/7074] >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.util.concurrent.TimeoutException: Cannot receive any >>> reply in 120 seconds >>> ... 8 more >>> 16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for >>> RPC 4858888672840406395 from /10.10.80.4:59931 (47 bytes) since it is >>> not outstanding >>> ^[[A^[[5~2016-08-04 09:12:07,016 INFO Transformed. Now showing >>> transformed: >>> 16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took >>> 71756ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR >>> downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010, 10.10.10.12:50010, >>> 10.10.91.9:50010] >>> 16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor >>> exception for block >>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 >>> java.io.IOException: Bad response ERROR for block >>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from >>> datanode 10.10.10.12:50010 >>> at >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897) >>> 16/08/04 09:13:48 WARN DFSClient: Error Recovery for block >>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in >>> pipeline 10.10.66.5:50010, 10.10.10.12:50010, 10.192.91.9:50010: bad >>> datanode 10.10.10.12:50010 >>> Traceback (most recent call last): >>> File "<stdin>", line 7, in <module> >>> File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in >>> show >>> print(self._jdf.showString(n, truncate)) >>> File >>> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", >>> line 933, in __call__ >>> File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco >>> return f(*a, **kw) >>> File >>> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line >>> 312, in get_return_value >>> py4j.protocol.Py4JJavaError: An error occurred while calling >>> o95.showString. >>> : java.lang.OutOfMemoryError: Requested array size exceeds VM limit >>> at java.util.Arrays.copyOf(Arrays.java:2271) >>> at >>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) >>> at >>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>> at >>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>> at >>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) >>> at >>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) >>> at >>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) >>> at >>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) >>> at >>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) >>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) >>> at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797) >>> at >>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) >>> at >>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) >>> at >>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) >>> at >>> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) >>> at >>> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) >>> at >>> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) >>> at >>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) >>> >>> Ben >>> >>> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com> wrote: >>> >>> Hi, >>> >>> I want to one hot encode a column containing 56 million distinct values. >>> My dataset is 800m rows + 17 columns. >>> I first apply a StringIndexer, but it already breaks there giving a* OOM >>> java heap space error.* >>> >>> *I launch my app on YARN with:* >>> /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors >>> 128 --executor-cores 2 --driver-memory 12G --conf >>> spark.driver.maxResultSize=8G >>> >>> *After grabbing the data, I run:* >>> >>> val catInts = Array(“bigfeature”) >>> >>> val stagesIndex = >>> scala.collection.mutable.ArrayBuffer.empty[StringIndexer] >>> val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder] >>> for (c <- catInts) { >>> println(s"starting with $c") >>> val i = new StringIndexer() >>> .setInputCol(c) >>> .setOutputCol(s"${c}Index") >>> stagesIndex += i >>> >>> val o = new OneHotEncoder() >>> .setDropLast(false) >>> .setInputCol(s"${c}Index") >>> .setOutputCol(s"${c}OHE") >>> stagesOhe += o >>> } >>> >>> println(s"Applying string indexers: fitting") >>> val pipelined = new Pipeline().setStages(stagesIndex.toArray) >>> val dfFitted = pipelined.fit(df) >>> >>> >>> *Then, the application master* shows a "countByValue at >>> StringIndexer.scala” taking 1.8 minutes (so very fast). >>> Afterwards, the shell console hangs for a while. *What is it doing >>> now? *After some time, it shows: >>> >>> scala> val dfFitted = pipelined.fit(df) >>> >>> java.lang.OutOfMemoryError: Java heap space >>> at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141) >>> at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139) >>> at >>> org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159) >>> at >>> org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230) >>> at >>> org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167) >>> at >>> org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86) >>> at >>> org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137) >>> at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93) >>> at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66) >>> at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149) >>> at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:893) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >>> at >>> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44) >>> at >>> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37) >>> at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145) >>> ... 16 elided >>> >>> >>> >>> >>> >>> >> >