I don’t think scaling RAM is a sane strategy to fixing these problems with 
using a dataframe / transformer approach to creating large sparse vectors.

One, though yes it will delay when it will fail, it will still fail. The 
original case I emailed about I tried this, and after waiting 50 minutes, it 
still broke.

Second, if you don’t use dataframes / transformers, but write your own 
functions to do one hot encoding and creating sparse vectors, it will easily 
work on small boxes. E.g. build up a dictionary with unique index numbers for 
all unique values, and access these indexes when creating sparse vectors:

def makeDict(df,columnName):
    dict = df.select(columnName).map(lambda 
x:unicode(x[0])).distinct().zipWithIndex().collectAsMap()
    dict["missing"] = len(dict)
    return dict

def encodeOneHot(x, column):
    key = "missing"
    if unicode(x) in mappings_bc[column]:
        key = unicode(x)
    return 
Vectors.sparse(len(mappings_bc[column]),[mappings_bc[column][key]],[1.0])

Ben

> On Aug 19, 2016, at 11:34 PM, Davies Liu <dav...@databricks.com> wrote:
> 
> The OOM happen in driver, you may also need more memory for driver.
> 
> On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu <dav...@databricks.com> wrote:
>> You are using lots of tiny executors (128 executor with only 2G
>> memory), could you try with bigger executor (for example 16G x 16)?
>> 
>> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>>> 
>>> So I wrote some code to reproduce the problem.
>>> 
>>> I assume here that a pipeline should be able to transform a categorical 
>>> feature with a few million levels.
>>> So I create a dataframe with the categorical feature (‘id’), apply a 
>>> StringIndexer and OneHotEncoder transformer, and run a loop where I 
>>> increase the amount of levels.
>>> It breaks at 1.276.000 levels.
>>> 
>>> Shall I report this as a ticket in JIRA?
>>> 
>>> ____________
>>> 
>>> 
>>> from pyspark.sql.functions import rand
>>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>>> from pyspark.ml import Pipeline
>>> 
>>> start_id = 100000
>>> n = 5000000
>>> step = (n - start_id) / 25
>>> 
>>> for i in xrange(start_id,start_id + n,step):
>>>    print "#########\n {}".format(i)
>>>    dfr = (sqlContext
>>>           .range(start_id, start_id + i)
>>>           .withColumn(‘label', rand(seed=10))
>>>           .withColumn('feat2', rand(seed=101))
>>>    #        .withColumn('normal', randn(seed=27))
>>>           ).repartition(32).cache()
>>>    # dfr.select("id", rand(seed=10).alias("uniform"), 
>>> randn(seed=27).alias("normal")).show()
>>>    dfr.show(1)
>>>    print "This dataframe has {0} rows (and therefore {0} levels will be one 
>>> hot encoded)".format(dfr.count())
>>> 
>>>    categorical_feature  = ['id']
>>>    stages = []
>>> 
>>>    for c in categorical_feature:
>>>        stages.append(StringIndexer(inputCol=c, 
>>> outputCol="{}Index".format(c)))
>>>        stages.append(OneHotEncoder(dropLast= False, inputCol = 
>>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>> 
>>>    columns = ["{}OHE".format(x) for x in categorical_feature]
>>>    columns.append('feat2')
>>> 
>>>    assembler = VectorAssembler(
>>>        inputCols=columns,
>>>        outputCol="features")
>>>    stages.append(assembler)
>>> 
>>>    df2 = dfr
>>> 
>>>    pipeline = Pipeline(stages=stages)
>>>    pipeline_fitted = pipeline.fit(df2)
>>>    df3 = pipeline_fitted.transform(df2)
>>>    df3.show(1)
>>>    dfr.unpersist()
>>> 
>>> 
>>> ____________
>>> 
>>> Output:
>>> 
>>> 
>>> #########
>>> 100000
>>> +------+---------------------------+-------------------+
>>> |    id|label                      |              feat2|
>>> +------+---------------------------+-------------------+
>>> |183601|        0.38693226548356197|0.04485291680169634|
>>> +------+---------------------------+-------------------+
>>> only showing top 1 row
>>> 
>>> This dataframe has 100000 rows (and therefore 100000 levels will be one hot 
>>> encoded)
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> |    id|label                      |              feat2|idIndex|            
>>>    idOHE|            features|
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> |183601|        
>>> 0.38693226548356197|0.04485291680169634|83240.0|(100000,[83240],[...|(100001,[83240,10...|
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> only showing top 1 row
>>> 
>>> #########
>>> 296000
>>> +------+---------------------------+-------------------+
>>> |    id|label                      |              feat2|
>>> +------+---------------------------+-------------------+
>>> |137008|         0.2996020619810592|0.38693226548356197|
>>> +------+---------------------------+-------------------+
>>> only showing top 1 row
>>> 
>>> This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
>>> encoded)
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> |    id|label                      |              feat2|idIndex|            
>>>    idOHE|            features|
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> |137008|         
>>> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> only showing top 1 row
>>> 
>>> #########
>>> 492000
>>> +------+---------------------------+-------------------+
>>> |    id|label                      |              feat2|
>>> +------+---------------------------+-------------------+
>>> |534351|         0.9450641392552516|0.23472935141246665|
>>> +------+---------------------------+-------------------+
>>> only showing top 1 row
>>> 
>>> This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
>>> encoded)
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> |    id|label                      |              feat2|idIndex|            
>>>    idOHE|            features|
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> |534351|         0.9450641392552516|0.23472935141246665| 
>>> 3656.0|(492000,[3656],[1...|(492001,[3656,492...|
>>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>>> only showing top 1 row
>>> 
>>> #########
>>> 688000
>>> +------+---------------------------+------------------+
>>> |    id|label                      |             feat2|
>>> +------+---------------------------+------------------+
>>> |573008|         0.3059347083549171|0.4846147657830415|
>>> +------+---------------------------+------------------+
>>> only showing top 1 row
>>> 
>>> This dataframe has 688000 rows (and therefore 688000 levels will be one hot 
>>> encoded)
>>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>>> |    id|label                      |             feat2| idIndex|            
>>>    idOHE|            features|
>>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>>> |573008|         
>>> 0.3059347083549171|0.4846147657830415|475855.0|(688000,[475855],...|(688001,[475855,6...|
>>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>>> only showing top 1 row
>>> 
>>> #########
>>> 884000
>>> +------+---------------------------+------------------+
>>> |    id|label                      |             feat2|
>>> +------+---------------------------+------------------+
>>> |970195|        0.34345290476989165|0.9843176058907069|
>>> +------+---------------------------+------------------+
>>> only showing top 1 row
>>> 
>>> This dataframe has 884000 rows (and therefore 884000 levels will be one hot 
>>> encoded)
>>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>>> |    id|label                      |             feat2| idIndex|            
>>>    idOHE|            features|
>>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>>> |970195|        
>>> 0.34345290476989165|0.9843176058907069|333915.0|(884000,[333915],...|(884001,[333915,8...|
>>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>>> only showing top 1 row
>>> 
>>> #########
>>> 1080000
>>> +------+---------------------------+-----------------+
>>> |    id|label                      |            feat2|
>>> +------+---------------------------+-----------------+
>>> |403758|         0.6333344187975314|0.774327685753309|
>>> +------+---------------------------+-----------------+
>>> only showing top 1 row
>>> 
>>> This dataframe has 1080000 rows (and therefore 1080000 levels will be one 
>>> hot encoded)
>>> +------+---------------------------+-----------------+--------+--------------------+--------------------+
>>> |    id|label                      |            feat2| idIndex|             
>>>   idOHE|            features|
>>> +------+---------------------------+-----------------+--------+--------------------+--------------------+
>>> |403758|         
>>> 0.6333344187975314|0.774327685753309|287898.0|(1080000,[287898]...|(1080001,[287898,...|
>>> +------+---------------------------+-----------------+--------+--------------------+--------------------+
>>> only showing top 1 row
>>> 
>>> #########
>>> 1276000
>>> +------+---------------------------+------------------+
>>> |    id|label                      |             feat2|
>>> +------+---------------------------+------------------+
>>> |508726|         0.2513814327408137|0.8480577183702391|
>>> +------+---------------------------+------------------+
>>> only showing top 1 row
>>> 
>>> This dataframe has 1276000 rows (and therefore 1276000 levels will be one 
>>> hot encoded)
>>> 
>>> ---------------------------------------------------------------------------
>>> Py4JJavaError                             Traceback (most recent call last)
>>> <ipython-input-2-f5c9fe263872> in <module>()
>>>     38     pipeline = Pipeline(stages=stages)
>>>     39     pipeline_fitted = pipeline.fit(df2)
>>> ---> 40     df3 = pipeline_fitted.transform(df2)
>>>     41     df3.show(1)
>>>     42     dfr.unpersist()
>>> 
>>> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, 
>>> params)
>>>    103                 return self.copy(params)._transform(dataset)
>>>    104             else:
>>> --> 105                 return self._transform(dataset)
>>>    106         else:
>>>    107             raise ValueError("Params must be a param map but got 
>>> %s." % type(params))
>>> 
>>> /opt/spark/2.0.0/python/pyspark/ml/pipeline.py in _transform(self, dataset)
>>>    196     def _transform(self, dataset):
>>>    197         for t in self.stages:
>>> --> 198             dataset = t.transform(dataset)
>>>    199         return dataset
>>>    200
>>> 
>>> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset, 
>>> params)
>>>    103                 return self.copy(params)._transform(dataset)
>>>    104             else:
>>> --> 105                 return self._transform(dataset)
>>>    106         else:
>>>    107             raise ValueError("Params must be a param map but got 
>>> %s." % type(params))
>>> 
>>> /opt/spark/2.0.0/python/pyspark/ml/wrapper.py in _transform(self, dataset)
>>>    227     def _transform(self, dataset):
>>>    228         self._transfer_params_to_java()
>>> --> 229         return DataFrame(self._java_obj.transform(dataset._jdf), 
>>> dataset.sql_ctx)
>>>    230
>>>    231
>>> 
>>> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in 
>>> __call__(self, *args)
>>>    931         answer = self.gateway_client.send_command(command)
>>>    932         return_value = get_return_value(
>>> --> 933             answer, self.gateway_client, self.target_id, self.name)
>>>    934
>>>    935         for temp_arg in temp_args:
>>> 
>>> /opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
>>>     61     def deco(*a, **kw):
>>>     62         try:
>>> ---> 63             return f(*a, **kw)
>>>     64         except py4j.protocol.Py4JJavaError as e:
>>>     65             s = e.java_exception.toString()
>>> 
>>> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in 
>>> get_return_value(answer, gateway_client, target_id, name)
>>>    310                 raise Py4JJavaError(
>>>    311                     "An error occurred while calling {0}{1}{2}.\n".
>>> --> 312                     format(target_id, ".", name), value)
>>>    313             else:
>>>    314                 raise Py4JError(
>>> 
>>> Py4JJavaError: An error occurred while calling o408.transform.
>>> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> at scala.collection.immutable.Stream$.from(Stream.scala:1262)
>>> at 
>>> scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
>>> at 
>>> scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
>>> at 
>>> scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274)
>>> at 
>>> scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277)
>>> at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202)
>>> at scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133)
>>> at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203)
>>> at scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66)
>>> at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197)
>>> at scala.collection.SeqLike$class.size(SeqLike.scala:106)
>>> at 
>>> scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37)
>>> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285)
>>> at 
>>> scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
>>> at 
>>> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
>>> at 
>>> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
>>> at scala.Option.map(Option.scala:146)
>>> at 
>>> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70)
>>> at 
>>> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65)
>>> at 
>>> org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234)
>>> at 
>>> org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246)
>>> at 
>>> org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at py4j.Gateway.invoke(Gateway.java:280)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>> 
>>> 
>>> 
>>> 
>>> Spark Properties
>>> NameValue
>>> spark.app.namepyspark-shell
>>> spark.driver.cores1
>>> spark.driver.extraJavaOptions-XX:+UnlockDiagnosticVMOptions 
>>> -XX:+PerfDisableSharedMem
>>> spark.driver.memory2g
>>> spark.dynamicAllocation.enabledFALSE
>>> spark.eventLog.dirhdfs:///spark/history
>>> spark.eventLog.enabledTRUE
>>> spark.executor.cores1
>>> spark.executor.extraJavaOptions-XX:+UnlockDiagnosticVMOptions 
>>> -XX:+PerfDisableSharedMem
>>> spark.executor.iddriver
>>> spark.executor.instances128
>>> spark.executor.memory2g
>>> spark.history.fs.logDirectoryhdfs:///spark/history
>>> spark.masteryarn-client
>>> spark.memory.fraction0.7
>>> spark.memory.storageFraction0.5
>>> spark.rdd.compressTRUE
>>> spark.scheduler.modeFIFO
>>> spark.serializer.objectStreamReset100
>>> spark.shuffle.service.enabledFALSE
>>> spark.speculationTRUE
>>> spark.submit.deployModeclient
>>> spark.task.maxFailures10
>>> spark.yarn.executor.memoryOverhead2048
>>> spark.yarn.isPythonTRUE
>>> 
>>> 
>>> On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com> 
>>> wrote:
>>> 
>>> Ok, interesting. Would be interested to see how it compares.
>>> 
>>> By the way, the feature size you select for the hasher should be a power of 
>>> 2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes 
>>> are evenly distributed (see the section on HashingTF under 
>>> http://spark.apache.org/docs/latest/ml-features.html#tf-idf).
>>> 
>>> On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote:
>>>> 
>>>> Thanks Nick, I played around with the hashing trick. When I set 
>>>> numFeatures to the amount of distinct values for the largest sparse 
>>>> feature, I ended up with half of them colliding. When raising the 
>>>> numFeatures to have less collisions, I soon ended up with the same memory 
>>>> problems as before. To be honest, I didn’t test the impact of having more 
>>>> or less collisions on the quality of the predictions, but tunnel visioned 
>>>> into getting it to work with the full sparsity.
>>>> 
>>>> Before I worked in RDD land; zipWithIndex on rdd with distinct values + 
>>>> one entry ‘missing’ for missing values during predict, collectAsMap, 
>>>> broadcast the map, udf generating sparse vector, assembling the vectors 
>>>> manually). To move into dataframe land, I wrote:
>>>> 
>>>> def getMappings(mode):
>>>>    mappings = defaultdict(dict)
>>>>    max_index = 0
>>>>    for c in cat_int[:]:    # for every categorical variable
>>>> 
>>>>        logging.info("starting with {}".format(c))
>>>>        if mode == 'train':
>>>>            grouped = (df2
>>>>                .groupBy(c).count().orderBy('count', ascending = False)  # 
>>>> get counts, ordered from largest to smallest
>>>>                .selectExpr("*", "1 as n")      # prepare for window 
>>>> function summing up 1s before current row to create a RANK
>>>>                .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS 
>>>> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS 
>>>> index".format(max_index))
>>>>                .drop('n') # drop the column with static 1 values used for 
>>>> the cumulative sum
>>>>                )
>>>>            logging.info("Got {} rows.".format(grouped.count()))
>>>>            grouped.show()
>>>>            logging.info('getting max')
>>>>            max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda 
>>>> r: r.t).first()  # update the max index so next categorical feature starts 
>>>> with it.
>>>>            logging.info("max_index has become: {}".format(max_index))
>>>>            logging.info('adding missing value, so we also train on this 
>>>> and prediction data missing it. ')
>>>>            schema = grouped.schema
>>>>            logging.info(schema)
>>>>            grouped = grouped.union(spark.createDataFrame([('missing', 0, 
>>>> max_index + 1)], schema))  # add index for missing value for values during 
>>>> predict that are unseen during training.
>>>>            max_index += 1
>>>>            saveto = "{}/{}".format(path, c)
>>>>            logging.info("Writing to: {}".format(saveto))
>>>>            grouped.write.parquet(saveto, mode = 'overwrite')
>>>> 
>>>>        elif mode == 'predict':
>>>>            loadfrom = "{}/{}".format(path, c)
>>>>            logging.info("Reading from: {}".format(loadfrom))
>>>>            grouped = spark.read.parquet(loadfrom)
>>>> 
>>>>        logging.info("Adding to dictionary")
>>>>        mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: 
>>>> (d[c], d['index'])).collectAsMap()  # build up dictionary to be 
>>>> broadcasted later on, used for creating sparse vectors
>>>>        max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
>>>> r.t).first()
>>>> 
>>>>    logging.info("Sanity check for indexes:")
>>>>    for c in cat_int[:]:
>>>>        logging.info("{} min: {} max: {}".format(c, 
>>>> min(mappings[c].values()), max(mappings[c].values())))   # some logging to 
>>>> confirm the indexes.
>>>>        logging.info("Missing value = {}".format(mappings[c]['missing']))
>>>>    return max_index, mappings
>>>> 
>>>> I’d love to see the StringIndexer + OneHotEncoder transformers cope with 
>>>> missing values during prediction; for now I’ll work with the hacked stuff 
>>>> above :).
>>>> (.. and I should compare the performance with using the hashing trick.)
>>>> 
>>>> Ben
>>> 
>>> 


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to