Sure, I understand there are some issues with handling this missing value
situation in StringIndexer currently. Your workaround is not ideal but I
see that it is probably the only mechanism available currently to avoid the
problem.

But the OOM issues seem to be more about the feature cardinality (so the
size of the hashmap to store the feature <-> index mappings).

A nice property of feature hashing is that it implicitly handles unseen
category labels by setting the coefficient value to 0 (in the absence of a
hash collision) - basically option 2 from H2O.

Why is that? Well once you've trained your model you have a (sparse)
N-dimensional weight vector that will be definition have 0s for unseen
indexes. At test time, any feature that only appears in your test set or
new data will be hashed to an index in the weight vector that has value 0.

So it could be useful for both of your problems.

On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen <bteeu...@gmail.com> wrote:

> Hi Nick,
>
> Thanks for the suggestion. Reducing the dimensionality is an option,
> thanks, but let’s say I really want to do this :).
>
> The reason why it’s so big is that I’m unifying my training and test data,
> and I don’t want to drop rows in the test data just because one of the
> features was missing in the training data. I wouldn’t need this
>  workaround, if I had a better *strategy in Spark for dealing with
> missing levels. *How Spark can deal with it:
>
>
> *"Additionally, there are two strategies regarding how StringIndexer will
> handle unseen labels when you have fit aStringIndexer on one dataset and
> then use it to transform another:*
>
> * • throw an exception (which is the default)*
> * • skip the row containing the unseen label entirely"*
> http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer
>
> I like how *H2O* handles this;
>
> *"What happens during prediction if the new sample has categorical levels
> not seen in training? The value will be filled with either special
> missing level (if trained with missing values and missing_value_handling
> was set to MeanImputation) or 0.”*
>
> https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md
>
> So assuming I need to unify the data, make it huge, and trying out more in
> scala, I see *these kinds of errors*:
> _____________
>
> scala> feedBack(s"Applying string indexers: fitting")
> 2016-08-04 10:13:20() | Applying string indexers: fitting
>
> scala> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
> pipelined: org.apache.spark.ml.Pipeline = pipeline_83be3b554e3a
>
> scala> val dfFitted = pipelined.fit(df)
> dfFitted: org.apache.spark.ml.PipelineModel = pipeline_83be3b554e3a
>
> scala> feedBack(s"Applying string indexers: transforming")
> 2016-08-04 10:17:29() | Applying string indexers: transforming
>
> scala> var df2 = dfFitted.transform(df)
> df2: org.apache.spark.sql.DataFrame = [myid: string, feature1: int ... 16
> more fields]
>
> scala>
>
> scala> feedBack(s"Applying OHE: fitting")
> 2016-08-04 10:18:07() | Applying OHE: fitting
>
> scala> val pipelined2 = new Pipeline().setStages(stagesOhe.toArray)
> pipelined2: org.apache.spark.ml.Pipeline = pipeline_ba7922a29322
>
> scala> val dfFitted2 = pipelined2.fit(df2)
> 16/08/04 10:21:41 WARN DFSClient: Slow ReadProcessor read fields took
> 85735ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR
> downstreamAckTimeNanos: 0, targets: [10.10.66.13:50010, 10.10.95.11:50010,
> 10.10.95.29:50010]
> 16/08/04 10:21:41 WARN DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377
> java.io.IOException: Bad response ERROR for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 from
> datanode 10.10.95.11:50010
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
> 16/08/04 10:21:41 WARN DFSClient: Error Recovery for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 in
> pipeline 10.10.66.13:50010, 10.10.95.11:50010, 10.10.95.29:50010: bad
> datanode 10.10.95.11:50010
> dfFitted2: org.apache.spark.ml.PipelineModel = pipeline_ba7922a29322
>
> scala> feedBack(s"Applying OHE: transforming")
> 2016-08-04 10:29:12() | Applying OHE: transforming
>
> scala> df2 = dfFitted2.transform(df2).cache()
> 16/08/04 10:34:18 WARN DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608
> java.io.EOFException: Premature EOF: no length prefix available
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203)
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 16/08/04 10:34:18 WARN DFSClient: Error Recovery for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 in
> pipeline 10.10.66.13:50010, 10.10.66.3:50010, 10.10.95.29:50010: bad
> datanode 10.10.66.13:50010
> 16/08/04 10:36:03 WARN DFSClient: Slow ReadProcessor read fields took
> 74146ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: SUCCESS
> status: ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.3:50010,
> 10.10.66.1:50010, 10.10.95.29:50010]
> 16/08/04 10:36:03 WARN DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488
> java.io.IOException: Bad response ERROR for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 from
> datanode 10.10.95.29:50010
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
> 16/08/04 10:36:03 WARN DFSClient: Error Recovery for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 in
> pipeline 10.10.66.3:50010, 10.10.66.1:50010, 10.10.95.29:50010: bad
> datanode 10.10.95.29:50010
> 16/08/04 10:40:48 WARN DFSClient: Slow ReadProcessor read fields took
> 60891ms (threshold=30000ms); ack: seqno: -2 status:
>
> ____________________
>
> After 40 minutes or so, with no activity in the application master, it
> dies.
>
> Ben
>
> On Aug 4, 2016, at 12:14 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Hi Ben
>
> Perhaps with this size cardinality it is worth looking at feature hashing
> for your problem. Spark has the HashingTF transformer that works on a
> column of "sentences" (i.e. [string]).
>
> For categorical features you can hack it a little by converting your
> feature value into a ["feature_name=feature_value"] representation. Then
> HashingTF can be used as is. Note you can also just do ["feature_value"],
> but the former would allow you, with a bit of munging, to hash all your
> feature columns at the same time.
>
> The advantage is speed and bounded memory footprint. The disadvantages
> include (i) no way to reverse the mapping from feature_index ->
> feature_name; (ii) potential for hash collisions (can be helped a bit by
> increasing your feature vector size).
>
> Here is a minimal example:
>
> In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder,
> HashingTF
> In [2]: from pyspark.sql.types import StringType, ArrayType
> In [3]: from pyspark.sql.functions import udf
>
> In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"),
> (3, "baz")], ["id", "feature"])
>
> In [5]: to_array = udf(lambda s: ["feature=%s" % s],
> ArrayType(StringType()))
>
> In [6]: df = df.withColumn("features", to_array("feature"))
>
> In [7]: df.show()
> +---+-------+-------------+
> | id|feature|     features|
> +---+-------+-------------+
> |  0|    foo|[feature=foo]|
> |  1|    bar|[feature=bar]|
> |  2|    foo|[feature=foo]|
> |  3|    baz|[feature=baz]|
> +---+-------+-------------+
>
> In [8]: indexer = StringIndexer(inputCol="feature",
> outputCol="feature_index")
>
> In [9]: indexed = indexer.fit(df).transform(df)
>
> In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index",
> outputCol="feature_vector")
>
> In [11]: encoded = encoder.transform(indexed)
>
> In [12]: encoded.show()
> +---+-------+-------------+-------------+--------------+
> | id|feature|     features|feature_index|feature_vector|
> +---+-------+-------------+-------------+--------------+
> |  0|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
> |  1|    bar|[feature=bar]|          2.0| (3,[2],[1.0])|
> |  2|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
> |  3|    baz|[feature=baz]|          1.0| (3,[1],[1.0])|
> +---+-------+-------------+-------------+--------------+
>
> In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features",
> outputCol="features_vector")
>
> In [23]: hashed = hasher.transform(df)
>
> In [24]: hashed.show()
> +---+-------+-------------+-----------------+
> | id|feature|     features|  features_vector|
> +---+-------+-------------+-----------------+
> |  0|    foo|[feature=foo]| (256,[59],[1.0])|
> |  1|    bar|[feature=bar]|(256,[219],[1.0])|
> |  2|    foo|[feature=foo]| (256,[59],[1.0])|
> |  3|    baz|[feature=baz]| (256,[38],[1.0])|
> +---+-------+-------------+-----------------+
>
> On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen <bteeu...@gmail.com> wrote:
>
>> I raised driver memory to 30G and maxresultsize to 25G, this time in
>> pyspark.
>>
>> *Code run:*
>>
>> cat_int  = ['bigfeature']
>>
>> stagesIndex = []
>> stagesOhe   = []
>> for c in cat_int:
>>   stagesIndex.append(StringIndexer(inputCol=c,
>> outputCol="{}Index".format(c)))
>>   stagesOhe.append(OneHotEncoder(dropLast= False, inputCol =
>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>
>> df2 = df
>>
>> for i in range(len(stagesIndex)):
>>   logging.info("Starting with {}".format(cat_int[i]))
>>   stagesIndex[i].fit(df2)
>>   logging.info("Fitted. Now transforming:")
>>   df2 = stagesIndex[i].fit(df2).transform(df2)
>>   logging.info("Transformed. Now showing transformed:")
>>   df2.show()
>>   logging.info("OHE")
>>   df2 = stagesOhe[i].transform(df2)
>>   logging.info("Fitted. Now showing OHE:")
>>   df2.show()
>>
>> *Now I get error:*
>>
>> 2016-08-04 08:53:44,839 INFO       Starting with bigfeature
>>     [57/7074]
>> ukStringIndexer_442b8e11e3294de9b83a
>> 2016-08-04 09:06:18,147 INFO       Fitted. Now transforming:
>> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 -
>> Cannot receive any reply in 120 seconds. This timeout is controlled by
>> spark.rpc.askTimeout
>> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120
>> seconds. This timeout is controlled by spark.rpc.askTimeout
>>         at org.apache.spark.rpc.RpcTimeout.org
>> <http://org.apache.spark.rpc.rpctimeout.org/>
>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>>         at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>>         at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>>         at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>         at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
>>         at scala.util.Try$.apply(Try.scala:192)
>>         at scala.util.Failure.recover(Try.scala:216)
>>         at
>> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>>         at
>> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>         at
>> org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
>>         at
>> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
>>         at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>         at scala.concurrent.Promise$class.complete(Promise.scala:55)
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
>>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>         at
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>>         at
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>>         at
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>         at
>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>         at
>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>         at
>> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>>         at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>         at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>>         at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>         at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>         at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
>>         at
>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
>>         at org.apache.spark.rpc.netty.NettyRpcEnv.org
>> <http://org.apache.spark.rpc.netty.nettyrpcenv.org/>
>> $apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
>>         at
>> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>>      [13/7074]
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.util.concurrent.TimeoutException: Cannot receive any
>> reply in 120 seconds
>>         ... 8 more
>> 16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for
>> RPC 4858888672840406395 from /10.10.80.4:59931 (47 bytes) since it is
>> not outstanding
>> ^[[A^[[5~2016-08-04 09:12:07,016 INFO       Transformed. Now showing
>> transformed:
>> 16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took
>> 71756ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR
>> downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010, 10.10.10.12:50010,
>> 10.10.91.9:50010]
>> 16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor
>> exception  for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265
>> java.io.IOException: Bad response ERROR for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from
>> datanode 10.10.10.12:50010
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
>> 16/08/04 09:13:48 WARN DFSClient: Error Recovery for block
>> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in
>> pipeline 10.10.66.5:50010, 10.10.10.12:50010, 10.192.91.9:50010: bad
>> datanode 10.10.10.12:50010
>> Traceback (most recent call last):
>>   File "<stdin>", line 7, in <module>
>>   File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in
>> show
>>     print(self._jdf.showString(n, truncate))
>>   File
>> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>> line 933, in __call__
>>   File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco
>>     return f(*a, **kw)
>>   File
>> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line
>> 312, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o95.showString.
>> : java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>>         at java.util.Arrays.copyOf(Arrays.java:2271)
>>         at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>         at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>         at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>         at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>         at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>>         at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>>         at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>>         at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>         at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>>         at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
>>         at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
>>         at
>> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>>         at
>> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
>>         at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>>
>> Ben
>>
>> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>>
>> Hi,
>>
>> I want to one hot encode a column containing 56 million distinct values.
>> My dataset is 800m rows + 17 columns.
>> I first apply a StringIndexer, but it already breaks there giving a* OOM
>> java heap space error.*
>>
>> *I launch my app on YARN with:*
>> /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors
>> 128 --executor-cores 2 --driver-memory 12G --conf
>> spark.driver.maxResultSize=8G
>>
>> *After grabbing the data, I run:*
>>
>> val catInts = Array(“bigfeature”)
>>
>> val stagesIndex =
>> scala.collection.mutable.ArrayBuffer.empty[StringIndexer]
>> val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder]
>> for (c <- catInts) {
>>   println(s"starting with $c")
>>   val i = new StringIndexer()
>>     .setInputCol(c)
>>     .setOutputCol(s"${c}Index")
>>   stagesIndex += i
>>
>>   val o = new OneHotEncoder()
>>     .setDropLast(false)
>>     .setInputCol(s"${c}Index")
>>     .setOutputCol(s"${c}OHE")
>>   stagesOhe += o
>> }
>>
>> println(s"Applying string indexers: fitting")
>> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
>> val dfFitted = pipelined.fit(df)
>>
>>
>> *Then, the application master* shows a "countByValue at
>> StringIndexer.scala” taking 1.8 minutes (so very fast).
>> Afterwards, the shell console hangs for a while. *What is it doing now? 
>> *After
>> some time, it shows:
>>
>> scala> val dfFitted = pipelined.fit(df)
>>                                                                       
>> java.lang.OutOfMemoryError: Java heap space
>>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
>>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
>>   at 
>> org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159)
>>   at 
>> org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230)
>>   at 
>> org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167)
>>   at 
>> org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86)
>>   at 
>> org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137)
>>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93)
>>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66)
>>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
>>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)
>>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>   at 
>> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
>>   at 
>> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
>>   at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)
>>   ... 16 elided
>>
>>
>>
>>
>>
>>
>

Reply via email to