Ok, interesting. Would be interested to see how it compares.

By the way, the feature size you select for the hasher should be a power of
2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes
are evenly distributed (see the section on HashingTF under
http://spark.apache.org/docs/latest/ml-features.html#tf-idf).

On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote:

> Thanks Nick, I played around with the hashing trick. When I set
> numFeatures to the amount of distinct values for the largest sparse
> feature, I ended up with half of them colliding. When raising the
> numFeatures to have less collisions, I soon ended up with the same memory
> problems as before. To be honest, I didn’t test the impact of having more
> or less collisions on the quality of the predictions, but tunnel visioned
> into getting it to work with the full sparsity.
>
> Before I worked in RDD land; zipWithIndex on rdd with distinct values +
> one entry ‘missing’ for missing values during predict, collectAsMap,
> broadcast the map, udf generating sparse vector, assembling the vectors
> manually). To move into dataframe land, I wrote:
>
> def getMappings(mode):
>     mappings = defaultdict(dict)
>     max_index = 0
>     for c in cat_int[:]:    # for every categorical variable
>
>         logging.info("starting with {}".format(c))
>         if mode == 'train':
>             grouped = (df2
>                 .groupBy(c).count().orderBy('count', ascending = False)  #
> get counts, ordered from largest to smallest
>                 .selectExpr("*", "1 as n")      # prepare for window
> function summing up 1s before current row to create a RANK
>                 .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS
> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS
> index".format(max_index))
>                 .drop('n') # drop the column with static 1 values used for
> the cumulative sum
>                 )
>             logging.info("Got {} rows.".format(grouped.count()))
>             grouped.show()
>             logging.info('getting max')
>             max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda
> r: r.t).first()  # update the max index so next categorical feature starts
> with it.
>             logging.info("max_index has become: {}".format(max_index))
>             logging.info('adding missing value, so we also train on this
> and prediction data missing it. ')
>             schema = grouped.schema
>             logging.info(schema)
>             grouped = grouped.union(spark.createDataFrame([('missing', 0,
> max_index + 1)], schema))  # add index for missing value for values during
> predict that are unseen during training.
>             max_index += 1
>             saveto = "{}/{}".format(path, c)
>             logging.info("Writing to: {}".format(saveto))
>             grouped.write.parquet(saveto, mode = 'overwrite')
>
>         elif mode == 'predict':
>             loadfrom = "{}/{}".format(path, c)
>             logging.info("Reading from: {}".format(loadfrom))
>             grouped = spark.read.parquet(loadfrom)
>
>         logging.info("Adding to dictionary")
>         mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d:
> (d[c], d['index'])).collectAsMap()  # build up dictionary to be broadcasted
> later on, used for creating sparse vectors
>         max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r:
> r.t).first()
>
>     logging.info("Sanity check for indexes:")
>     for c in cat_int[:]:
>         logging.info("{} min: {} max: {}".format(c,
> min(mappings[c].values()), max(mappings[c].values())))   # some logging to
> confirm the indexes.
>         logging.info("Missing value = {}".format(mappings[c]['missing']))
>     return max_index, mappings
>
> I’d love to see the StringIndexer + OneHotEncoder transformers cope with
> missing values during prediction; for now I’ll work with the hacked stuff
> above :).
> (.. and I should compare the performance with using the hashing trick.)
>
> Ben
>
>
> On Aug 4, 2016, at 3:44 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Sure, I understand there are some issues with handling this missing value
> situation in StringIndexer currently. Your workaround is not ideal but I
> see that it is probably the only mechanism available currently to avoid the
> problem.
>
> But the OOM issues seem to be more about the feature cardinality (so the
> size of the hashmap to store the feature <-> index mappings).
>
> A nice property of feature hashing is that it implicitly handles unseen
> category labels by setting the coefficient value to 0 (in the absence of a
> hash collision) - basically option 2 from H2O.
>
> Why is that? Well once you've trained your model you have a (sparse)
> N-dimensional weight vector that will be definition have 0s for unseen
> indexes. At test time, any feature that only appears in your test set or
> new data will be hashed to an index in the weight vector that has value 0.
>
> So it could be useful for both of your problems.
>
> On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen <bteeu...@gmail.com> wrote:
>
>> Hi Nick,
>>
>> Thanks for the suggestion. Reducing the dimensionality is an option,
>> thanks, but let’s say I really want to do this :).
>>
>> The reason why it’s so big is that I’m unifying my training and test
>> data, and I don’t want to drop rows in the test data just because one of
>> the features was missing in the training data. I wouldn’t need this
>>  workaround, if I had a better *strategy in Spark for dealing with
>> missing levels. *How Spark can deal with it:
>>
>>
>> *"Additionally, there are two strategies regarding how StringIndexer will
>> handle unseen labels when you have fit aStringIndexer on one dataset and
>> then use it to transform another:*
>>
>> * • throw an exception (which is the default)*
>> * • skip the row containing the unseen label entirely"*
>> http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer
>>
>> I like how *H2O* handles this;
>>
>> *"What happens during prediction if the new sample has categorical levels
>> not seen in training? The value will be filled with either special
>> missing level (if trained with missing values and missing_value_handling
>> was set to MeanImputation) or 0.”*
>>
>> https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md
>>
>> So assuming I need to unify the data, make it huge, and trying out more
>> in scala, I see *these kinds of errors*:
>> _____________
>>
>> scala> feedBack(s"Applying string indexers: fitting")
>> 2016-08-04 10:13:20() | Applying string indexers: fitting
>>
>> scala> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
>> pipelined: org.apache.spark.ml.Pipeline = pipeline_83be3b554e3a
>>
>> scala> val dfFitted = pipelined.fit(df)
>> dfFitted: org.apache.spark.ml.PipelineModel = pipeline_83be3b554e3a
>>
>> scala> feedBack(s"Applying string indexers: transforming")
>> 2016-08-04 10:17:29() | Applying string indexers: transforming
>>
>> scala> var df2 = dfFitted.transform(df)
>> df2: org.apache.spark.sql.DataFrame = [myid: string, feature1: int ... 16
>> more fields]
>>
>> scala>
>>
>> scala> feedBack(s"Applying OHE: fitting")
>> 2016-08-04 10:18:07() | Applying OHE: fitting
>>
>> scala> val pipelined2 = new Pipeline().setStages(stagesOhe.toArray)
>> pipelined2: org.apache.spark.ml.Pipeline = pipeline_ba7922a29322
>>
>> scala> val dfFitted2 = pipelined2.fit(df2)
>> 16/08/04 10:21:41 WARN DFSClient: Slow ReadProcessor read fields took
>> 85735ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR
>> downstreamAckTimeNanos: 0, targets: [10.10.66.13:50010, 10.10.95.11:50010,
>> 10.10.95.29:50010]
>> 16/08/04 10:21:41 WARN DFSClient: DFSOutputStream ResponseProcessor
>> exception  for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377
>> java.io.IOException: Bad response ERROR for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 from
>> datanode 10.10.95.11:50010
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
>> 16/08/04 10:21:41 WARN DFSClient: Error Recovery for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 in
>> pipeline 10.10.66.13:50010, 10.10.95.11:50010, 10.10.95.29:50010: bad
>> datanode 10.10.95.11:50010
>> dfFitted2: org.apache.spark.ml.PipelineModel = pipeline_ba7922a29322
>>
>> scala> feedBack(s"Applying OHE: transforming")
>> 2016-08-04 10:29:12() | Applying OHE: transforming
>>
>> scala> df2 = dfFitted2.transform(df2).cache()
>> 16/08/04 10:34:18 WARN DFSClient: DFSOutputStream ResponseProcessor
>> exception  for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608
>> java.io.EOFException: Premature EOF: no length prefix available
>>         at
>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203)
>>         at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> 16/08/04 10:34:18 WARN DFSClient: Error Recovery for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 in
>> pipeline 10.10.66.13:50010, 10.10.66.3:50010, 10.10.95.29:50010: bad
>> datanode 10.10.66.13:50010
>> 16/08/04 10:36:03 WARN DFSClient: Slow ReadProcessor read fields took
>> 74146ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: SUCCESS
>> status: ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.3:50010,
>> 10.10.66.1:50010, 10.10.95.29:50010]
>> 16/08/04 10:36:03 WARN DFSClient: DFSOutputStream ResponseProcessor
>> exception  for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488
>> java.io.IOException: Bad response ERROR for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 from
>> datanode 10.10.95.29:50010
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
>> 16/08/04 10:36:03 WARN DFSClient: Error Recovery for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 in
>> pipeline 10.10.66.3:50010, 10.10.66.1:50010, 10.10.95.29:50010: bad
>> datanode 10.10.95.29:50010
>> 16/08/04 10:40:48 WARN DFSClient: Slow ReadProcessor read fields took
>> 60891ms (threshold=30000ms); ack: seqno: -2 status:
>>
>> ____________________
>>
>> After 40 minutes or so, with no activity in the application master, it
>> dies.
>>
>> Ben
>>
>> On Aug 4, 2016, at 12:14 PM, Nick Pentreath <nick.pentre...@gmail.com>
>> wrote:
>>
>> Hi Ben
>>
>> Perhaps with this size cardinality it is worth looking at feature hashing
>> for your problem. Spark has the HashingTF transformer that works on a
>> column of "sentences" (i.e. [string]).
>>
>> For categorical features you can hack it a little by converting your
>> feature value into a ["feature_name=feature_value"] representation. Then
>> HashingTF can be used as is. Note you can also just do ["feature_value"],
>> but the former would allow you, with a bit of munging, to hash all your
>> feature columns at the same time.
>>
>> The advantage is speed and bounded memory footprint. The disadvantages
>> include (i) no way to reverse the mapping from feature_index ->
>> feature_name; (ii) potential for hash collisions (can be helped a bit by
>> increasing your feature vector size).
>>
>> Here is a minimal example:
>>
>> In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder,
>> HashingTF
>> In [2]: from pyspark.sql.types import StringType, ArrayType
>> In [3]: from pyspark.sql.functions import udf
>>
>> In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"),
>> (3, "baz")], ["id", "feature"])
>>
>> In [5]: to_array = udf(lambda s: ["feature=%s" % s],
>> ArrayType(StringType()))
>>
>> In [6]: df = df.withColumn("features", to_array("feature"))
>>
>> In [7]: df.show()
>> +---+-------+-------------+
>> | id|feature|     features|
>> +---+-------+-------------+
>> |  0|    foo|[feature=foo]|
>> |  1|    bar|[feature=bar]|
>> |  2|    foo|[feature=foo]|
>> |  3|    baz|[feature=baz]|
>> +---+-------+-------------+
>>
>> In [8]: indexer = StringIndexer(inputCol="feature",
>> outputCol="feature_index")
>>
>> In [9]: indexed = indexer.fit(df).transform(df)
>>
>> In [10]: encoder = OneHotEncoder(dropLast=False,
>> inputCol="feature_index", outputCol="feature_vector")
>>
>> In [11]: encoded = encoder.transform(indexed)
>>
>> In [12]: encoded.show()
>> +---+-------+-------------+-------------+--------------+
>> | id|feature|     features|feature_index|feature_vector|
>> +---+-------+-------------+-------------+--------------+
>> |  0|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
>> |  1|    bar|[feature=bar]|          2.0| (3,[2],[1.0])|
>> |  2|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
>> |  3|    baz|[feature=baz]|          1.0| (3,[1],[1.0])|
>> +---+-------+-------------+-------------+--------------+
>>
>> In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features",
>> outputCol="features_vector")
>>
>> In [23]: hashed = hasher.transform(df)
>>
>> In [24]: hashed.show()
>> +---+-------+-------------+-----------------+
>> | id|feature|     features|  features_vector|
>> +---+-------+-------------+-----------------+
>> |  0|    foo|[feature=foo]| (256,[59],[1.0])|
>> |  1|    bar|[feature=bar]|(256,[219],[1.0])|
>> |  2|    foo|[feature=foo]| (256,[59],[1.0])|
>> |  3|    baz|[feature=baz]| (256,[38],[1.0])|
>> +---+-------+-------------+-----------------+
>>
>> On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen <bteeu...@gmail.com> wrote:
>>
>>> I raised driver memory to 30G and maxresultsize to 25G, this time in
>>> pyspark.
>>>
>>> *Code run:*
>>>
>>> cat_int  = ['bigfeature']
>>>
>>> stagesIndex = []
>>> stagesOhe   = []
>>> for c in cat_int:
>>>   stagesIndex.append(StringIndexer(inputCol=c,
>>> outputCol="{}Index".format(c)))
>>>   stagesOhe.append(OneHotEncoder(dropLast= False, inputCol =
>>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>>
>>> df2 = df
>>>
>>> for i in range(len(stagesIndex)):
>>>   logging.info("Starting with {}".format(cat_int[i]))
>>>   stagesIndex[i].fit(df2)
>>>   logging.info("Fitted. Now transforming:")
>>>   df2 = stagesIndex[i].fit(df2).transform(df2)
>>>   logging.info("Transformed. Now showing transformed:")
>>>   df2.show()
>>>   logging.info("OHE")
>>>   df2 = stagesOhe[i].transform(df2)
>>>   logging.info("Fitted. Now showing OHE:")
>>>   df2.show()
>>>
>>> *Now I get error:*
>>>
>>> 2016-08-04 08:53:44,839 INFO       Starting with bigfeature
>>>       [57/7074]
>>> ukStringIndexer_442b8e11e3294de9b83a
>>> 2016-08-04 09:06:18,147 INFO       Fitted. Now transforming:
>>> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 -
>>> Cannot receive any reply in 120 seconds. This timeout is controlled by
>>> spark.rpc.askTimeout
>>> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in
>>> 120 seconds. This timeout is controlled by spark.rpc.askTimeout
>>>         at org.apache.spark.rpc.RpcTimeout.org
>>> <http://org.apache.spark.rpc.rpctimeout.org/>
>>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>>>         at
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>>>         at
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>>>         at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>         at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
>>>         at scala.util.Try$.apply(Try.scala:192)
>>>         at scala.util.Failure.recover(Try.scala:216)
>>>         at
>>> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>>>         at
>>> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>>>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>         at
>>> org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
>>>         at
>>> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
>>>         at
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>>         at
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>>         at scala.concurrent.Promise$class.complete(Promise.scala:55)
>>>         at
>>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
>>>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>>>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>>>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>         at
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>>>         at
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>>>         at
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>>         at
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>>         at
>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>         at
>>> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>>>         at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>>         at
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>>>         at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>>         at
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>>         at
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>>         at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
>>>         at
>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
>>>         at org.apache.spark.rpc.netty.NettyRpcEnv.org
>>> <http://org.apache.spark.rpc.netty.nettyrpcenv.org/>
>>> $apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
>>>         at
>>> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>>      [13/7074]
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.util.concurrent.TimeoutException: Cannot receive any
>>> reply in 120 seconds
>>>         ... 8 more
>>> 16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for
>>> RPC 4858888672840406395 from /10.10.80.4:59931 (47 bytes) since it is
>>> not outstanding
>>> ^[[A^[[5~2016-08-04 09:12:07,016 INFO       Transformed. Now showing
>>> transformed:
>>> 16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took
>>> 71756ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR
>>> downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010, 10.10.10.12:50010,
>>> 10.10.91.9:50010]
>>> 16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor
>>> exception  for block
>>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265
>>> java.io.IOException: Bad response ERROR for block
>>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from
>>> datanode 10.10.10.12:50010
>>>         at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
>>> 16/08/04 09:13:48 WARN DFSClient: Error Recovery for block
>>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in
>>> pipeline 10.10.66.5:50010, 10.10.10.12:50010, 10.192.91.9:50010: bad
>>> datanode 10.10.10.12:50010
>>> Traceback (most recent call last):
>>>   File "<stdin>", line 7, in <module>
>>>   File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in
>>> show
>>>     print(self._jdf.showString(n, truncate))
>>>   File
>>> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>> line 933, in __call__
>>>   File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco
>>>     return f(*a, **kw)
>>>   File
>>> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line
>>> 312, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o95.showString.
>>> : java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>>>         at java.util.Arrays.copyOf(Arrays.java:2271)
>>>         at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>         at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>         at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>>         at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>>         at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>         at
>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>>>         at
>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>>>         at
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>>>         at
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>>         at
>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>>>         at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
>>>         at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>>>         at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
>>>         at
>>> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
>>>         at
>>> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>>>         at
>>> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
>>>         at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>>>
>>> Ben
>>>
>>> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I want to one hot encode a column containing 56 million distinct values.
>>> My dataset is 800m rows + 17 columns.
>>> I first apply a StringIndexer, but it already breaks there giving a* OOM
>>> java heap space error.*
>>>
>>> *I launch my app on YARN with:*
>>> /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors
>>> 128 --executor-cores 2 --driver-memory 12G --conf
>>> spark.driver.maxResultSize=8G
>>>
>>> *After grabbing the data, I run:*
>>>
>>> val catInts = Array(“bigfeature”)
>>>
>>> val stagesIndex =
>>> scala.collection.mutable.ArrayBuffer.empty[StringIndexer]
>>> val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder]
>>> for (c <- catInts) {
>>>   println(s"starting with $c")
>>>   val i = new StringIndexer()
>>>     .setInputCol(c)
>>>     .setOutputCol(s"${c}Index")
>>>   stagesIndex += i
>>>
>>>   val o = new OneHotEncoder()
>>>     .setDropLast(false)
>>>     .setInputCol(s"${c}Index")
>>>     .setOutputCol(s"${c}OHE")
>>>   stagesOhe += o
>>> }
>>>
>>> println(s"Applying string indexers: fitting")
>>> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
>>> val dfFitted = pipelined.fit(df)
>>>
>>>
>>> *Then, the application master* shows a "countByValue at
>>> StringIndexer.scala” taking 1.8 minutes (so very fast).
>>> Afterwards, the shell console hangs for a while. *What is it doing
>>> now? *After some time, it shows:
>>>
>>> scala> val dfFitted = pipelined.fit(df)
>>>                                                                       
>>> java.lang.OutOfMemoryError: Java heap space
>>>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
>>>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167)
>>>   at 
>>> org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86)
>>>   at 
>>> org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137)
>>>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93)
>>>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66)
>>>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
>>>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)
>>>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>>   at 
>>> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
>>>   at 
>>> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
>>>   at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)
>>>   ... 16 elided
>>>
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to