Hi Ben

Perhaps with this size cardinality it is worth looking at feature hashing
for your problem. Spark has the HashingTF transformer that works on a
column of "sentences" (i.e. [string]).

For categorical features you can hack it a little by converting your
feature value into a ["feature_name=feature_value"] representation. Then
HashingTF can be used as is. Note you can also just do ["feature_value"],
but the former would allow you, with a bit of munging, to hash all your
feature columns at the same time.

The advantage is speed and bounded memory footprint. The disadvantages
include (i) no way to reverse the mapping from feature_index ->
feature_name; (ii) potential for hash collisions (can be helped a bit by
increasing your feature vector size).

Here is a minimal example:

In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder,
HashingTF
In [2]: from pyspark.sql.types import StringType, ArrayType
In [3]: from pyspark.sql.functions import udf

In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"), (3,
"baz")], ["id", "feature"])

In [5]: to_array = udf(lambda s: ["feature=%s" % s],
ArrayType(StringType()))

In [6]: df = df.withColumn("features", to_array("feature"))

In [7]: df.show()
+---+-------+-------------+
| id|feature|     features|
+---+-------+-------------+
|  0|    foo|[feature=foo]|
|  1|    bar|[feature=bar]|
|  2|    foo|[feature=foo]|
|  3|    baz|[feature=baz]|
+---+-------+-------------+

In [8]: indexer = StringIndexer(inputCol="feature",
outputCol="feature_index")

In [9]: indexed = indexer.fit(df).transform(df)

In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index",
outputCol="feature_vector")

In [11]: encoded = encoder.transform(indexed)

In [12]: encoded.show()
+---+-------+-------------+-------------+--------------+
| id|feature|     features|feature_index|feature_vector|
+---+-------+-------------+-------------+--------------+
|  0|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
|  1|    bar|[feature=bar]|          2.0| (3,[2],[1.0])|
|  2|    foo|[feature=foo]|          0.0| (3,[0],[1.0])|
|  3|    baz|[feature=baz]|          1.0| (3,[1],[1.0])|
+---+-------+-------------+-------------+--------------+

In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features",
outputCol="features_vector")

In [23]: hashed = hasher.transform(df)

In [24]: hashed.show()
+---+-------+-------------+-----------------+
| id|feature|     features|  features_vector|
+---+-------+-------------+-----------------+
|  0|    foo|[feature=foo]| (256,[59],[1.0])|
|  1|    bar|[feature=bar]|(256,[219],[1.0])|
|  2|    foo|[feature=foo]| (256,[59],[1.0])|
|  3|    baz|[feature=baz]| (256,[38],[1.0])|
+---+-------+-------------+-----------------+

On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen <bteeu...@gmail.com> wrote:

> I raised driver memory to 30G and maxresultsize to 25G, this time in
> pyspark.
>
> *Code run:*
>
> cat_int  = ['bigfeature']
>
> stagesIndex = []
> stagesOhe   = []
> for c in cat_int:
>   stagesIndex.append(StringIndexer(inputCol=c,
> outputCol="{}Index".format(c)))
>   stagesOhe.append(OneHotEncoder(dropLast= False, inputCol =
> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>
> df2 = df
>
> for i in range(len(stagesIndex)):
>   logging.info("Starting with {}".format(cat_int[i]))
>   stagesIndex[i].fit(df2)
>   logging.info("Fitted. Now transforming:")
>   df2 = stagesIndex[i].fit(df2).transform(df2)
>   logging.info("Transformed. Now showing transformed:")
>   df2.show()
>   logging.info("OHE")
>   df2 = stagesOhe[i].transform(df2)
>   logging.info("Fitted. Now showing OHE:")
>   df2.show()
>
> *Now I get error:*
>
> 2016-08-04 08:53:44,839 INFO       Starting with bigfeature
>     [57/7074]
> ukStringIndexer_442b8e11e3294de9b83a
> 2016-08-04 09:06:18,147 INFO       Fitted. Now transforming:
> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 -
> Cannot receive any reply in 120 seconds. This timeout is controlled by
> spark.rpc.askTimeout
> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120
> seconds. This timeout is controlled by spark.rpc.askTimeout
>         at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
>         at scala.util.Try$.apply(Try.scala:192)
>         at scala.util.Failure.recover(Try.scala:216)
>         at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>         at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at
> org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
>         at
> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
>         at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>         at scala.concurrent.Promise$class.complete(Promise.scala:55)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>         at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
>         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>         at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>         at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>         at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>         at
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>         at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>         at
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>         at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>         at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>         at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>         at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
>         at org.apache.spark.rpc.netty.NettyRpcEnv.org
> $apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
>         at
> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>      [13/7074]
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply
> in 120 seconds
>         ... 8 more
> 16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for RPC
> 4858888672840406395 from /10.10.80.4:59931 (47 bytes) since it is not
> outstanding
> ^[[A^[[5~2016-08-04 09:12:07,016 INFO       Transformed. Now showing
> transformed:
> 16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took
> 71756ms (threshold=30000ms); ack: seqno: -2 status: SUCCESS status: ERROR
> downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010, 10.10.10.12:50010,
> 10.10.91.9:50010]
> 16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265
> java.io.IOException: Bad response ERROR for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from
> datanode 10.10.10.12:50010
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
> 16/08/04 09:13:48 WARN DFSClient: Error Recovery for block
> BP-2111192564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in
> pipeline 10.10.66.5:50010, 10.10.10.12:50010, 10.192.91.9:50010: bad
> datanode 10.10.10.12:50010
> Traceback (most recent call last):
>   File "<stdin>", line 7, in <module>
>   File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in
> show
>     print(self._jdf.showString(n, truncate))
>   File
> "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
> line 933, in __call__
>   File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco
>     return f(*a, **kw)
>   File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
> line 312, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o95.showString.
> : java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>         at java.util.Arrays.copyOf(Arrays.java:2271)
>         at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>         at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>         at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>         at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>         at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>         at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>         at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>         at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>         at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>         at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>         at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
>         at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
>         at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>         at
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
>         at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>
> Ben
>
> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>
> Hi,
>
> I want to one hot encode a column containing 56 million distinct values.
> My dataset is 800m rows + 17 columns.
> I first apply a StringIndexer, but it already breaks there giving a* OOM
> java heap space error.*
>
> *I launch my app on YARN with:*
> /opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors 128
> --executor-cores 2 --driver-memory 12G --conf spark.driver.maxResultSize=8G
>
> *After grabbing the data, I run:*
>
> val catInts = Array(“bigfeature”)
>
> val stagesIndex = scala.collection.mutable.ArrayBuffer.empty[StringIndexer]
> val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder]
> for (c <- catInts) {
>   println(s"starting with $c")
>   val i = new StringIndexer()
>     .setInputCol(c)
>     .setOutputCol(s"${c}Index")
>   stagesIndex += i
>
>   val o = new OneHotEncoder()
>     .setDropLast(false)
>     .setInputCol(s"${c}Index")
>     .setOutputCol(s"${c}OHE")
>   stagesOhe += o
> }
>
> println(s"Applying string indexers: fitting")
> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
> val dfFitted = pipelined.fit(df)
>
>
> *Then, the application master* shows a "countByValue at
> StringIndexer.scala” taking 1.8 minutes (so very fast).
> Afterwards, the shell console hangs for a while. *What is it doing now? *After
> some time, it shows:
>
> scala> val dfFitted = pipelined.fit(df)
>                                                                       
> java.lang.OutOfMemoryError: Java heap space
>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
>   at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
>   at 
> org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159)
>   at 
> org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230)
>   at 
> org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167)
>   at 
> org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86)
>   at 
> org.apache.spark.ml.feature.StringIndexerModel.<init>(StringIndexer.scala:137)
>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93)
>   at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66)
>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
>   at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
>   at 
> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
>   at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)
>   ... 16 elided
>
>
>
>
>
>

Reply via email to