Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-22 Thread Ben Teeuwen
I don’t think scaling RAM is a sane strategy to fixing these problems with 
using a dataframe / transformer approach to creating large sparse vectors.

One, though yes it will delay when it will fail, it will still fail. The 
original case I emailed about I tried this, and after waiting 50 minutes, it 
still broke.

Second, if you don’t use dataframes / transformers, but write your own 
functions to do one hot encoding and creating sparse vectors, it will easily 
work on small boxes. E.g. build up a dictionary with unique index numbers for 
all unique values, and access these indexes when creating sparse vectors:

def makeDict(df,columnName):
dict = df.select(columnName).map(lambda 
x:unicode(x[0])).distinct().zipWithIndex().collectAsMap()
dict["missing"] = len(dict)
return dict

def encodeOneHot(x, column):
key = "missing"
if unicode(x) in mappings_bc[column]:
key = unicode(x)
return 
Vectors.sparse(len(mappings_bc[column]),[mappings_bc[column][key]],[1.0])

Ben

> On Aug 19, 2016, at 11:34 PM, Davies Liu  wrote:
> 
> The OOM happen in driver, you may also need more memory for driver.
> 
> On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu  wrote:
>> You are using lots of tiny executors (128 executor with only 2G
>> memory), could you try with bigger executor (for example 16G x 16)?
>> 
>> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen  wrote:
>>> 
>>> So I wrote some code to reproduce the problem.
>>> 
>>> I assume here that a pipeline should be able to transform a categorical 
>>> feature with a few million levels.
>>> So I create a dataframe with the categorical feature (‘id’), apply a 
>>> StringIndexer and OneHotEncoder transformer, and run a loop where I 
>>> increase the amount of levels.
>>> It breaks at 1.276.000 levels.
>>> 
>>> Shall I report this as a ticket in JIRA?
>>> 
>>> 
>>> 
>>> 
>>> from pyspark.sql.functions import rand
>>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>>> from pyspark.ml import Pipeline
>>> 
>>> start_id = 10
>>> n = 500
>>> step = (n - start_id) / 25
>>> 
>>> for i in xrange(start_id,start_id + n,step):
>>>print "#\n {}".format(i)
>>>dfr = (sqlContext
>>>   .range(start_id, start_id + i)
>>>   .withColumn(‘label', rand(seed=10))
>>>   .withColumn('feat2', rand(seed=101))
>>>#.withColumn('normal', randn(seed=27))
>>>   ).repartition(32).cache()
>>># dfr.select("id", rand(seed=10).alias("uniform"), 
>>> randn(seed=27).alias("normal")).show()
>>>dfr.show(1)
>>>print "This dataframe has {0} rows (and therefore {0} levels will be one 
>>> hot encoded)".format(dfr.count())
>>> 
>>>categorical_feature  = ['id']
>>>stages = []
>>> 
>>>for c in categorical_feature:
>>>stages.append(StringIndexer(inputCol=c, 
>>> outputCol="{}Index".format(c)))
>>>stages.append(OneHotEncoder(dropLast= False, inputCol = 
>>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>> 
>>>columns = ["{}OHE".format(x) for x in categorical_feature]
>>>columns.append('feat2')
>>> 
>>>assembler = VectorAssembler(
>>>inputCols=columns,
>>>outputCol="features")
>>>stages.append(assembler)
>>> 
>>>df2 = dfr
>>> 
>>>pipeline = Pipeline(stages=stages)
>>>pipeline_fitted = pipeline.fit(df2)
>>>df3 = pipeline_fitted.transform(df2)
>>>df3.show(1)
>>>dfr.unpersist()
>>> 
>>> 
>>> 
>>> 
>>> Output:
>>> 
>>> 
>>> #
>>> 10
>>> +--+---+---+
>>> |id|label  |  feat2|
>>> +--+---+---+
>>> |183601|0.38693226548356197|0.04485291680169634|
>>> +--+---+---+
>>> only showing top 1 row
>>> 
>>> This dataframe has 10 rows (and therefore 10 levels will be one hot 
>>> encoded)
>>> +--+---+---+---+++
>>> |id|label  |  feat2|idIndex|
>>>idOHE|features|
>>> +--+---+---+---+++
>>> |183601|
>>> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
>>> +--+---+---+---+++
>>> only showing top 1 row
>>> 
>>> #
>>> 296000
>>> +--+---+---+
>>> |id|label  |  feat2|
>>> +--+---+---+
>>> |137008| 0.2996020619810592|0.38693226548356197|
>>> +--+---+---+
>>> only showing top 1 row
>>> 
>>> This 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Davies Liu
The OOM happen in driver, you may also need more memory for driver.

On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu  wrote:
> You are using lots of tiny executors (128 executor with only 2G
> memory), could you try with bigger executor (for example 16G x 16)?
>
> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen  wrote:
>>
>> So I wrote some code to reproduce the problem.
>>
>> I assume here that a pipeline should be able to transform a categorical 
>> feature with a few million levels.
>> So I create a dataframe with the categorical feature (‘id’), apply a 
>> StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
>> the amount of levels.
>> It breaks at 1.276.000 levels.
>>
>> Shall I report this as a ticket in JIRA?
>>
>> 
>>
>>
>> from pyspark.sql.functions import rand
>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>> from pyspark.ml import Pipeline
>>
>> start_id = 10
>> n = 500
>> step = (n - start_id) / 25
>>
>> for i in xrange(start_id,start_id + n,step):
>> print "#\n {}".format(i)
>> dfr = (sqlContext
>>.range(start_id, start_id + i)
>>.withColumn(‘label', rand(seed=10))
>>.withColumn('feat2', rand(seed=101))
>> #.withColumn('normal', randn(seed=27))
>>).repartition(32).cache()
>> # dfr.select("id", rand(seed=10).alias("uniform"), 
>> randn(seed=27).alias("normal")).show()
>> dfr.show(1)
>> print "This dataframe has {0} rows (and therefore {0} levels will be one 
>> hot encoded)".format(dfr.count())
>>
>> categorical_feature  = ['id']
>> stages = []
>>
>> for c in categorical_feature:
>> stages.append(StringIndexer(inputCol=c, 
>> outputCol="{}Index".format(c)))
>> stages.append(OneHotEncoder(dropLast= False, inputCol = 
>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>
>> columns = ["{}OHE".format(x) for x in categorical_feature]
>> columns.append('feat2')
>>
>> assembler = VectorAssembler(
>> inputCols=columns,
>> outputCol="features")
>> stages.append(assembler)
>>
>> df2 = dfr
>>
>> pipeline = Pipeline(stages=stages)
>> pipeline_fitted = pipeline.fit(df2)
>> df3 = pipeline_fitted.transform(df2)
>> df3.show(1)
>> dfr.unpersist()
>>
>>
>> 
>>
>> Output:
>>
>>
>> #
>>  10
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |183601|0.38693226548356197|0.04485291680169634|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 10 rows (and therefore 10 levels will be one hot 
>> encoded)
>> +--+---+---+---+++
>> |id|label  |  feat2|idIndex| 
>>   idOHE|features|
>> +--+---+---+---+++
>> |183601|
>> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
>> +--+---+---+---+++
>> only showing top 1 row
>>
>> #
>>  296000
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |137008| 0.2996020619810592|0.38693226548356197|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
>> encoded)
>> +--+---+---+---+++
>> |id|label  |  feat2|idIndex| 
>>   idOHE|features|
>> +--+---+---+---+++
>> |137008| 
>> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
>> +--+---+---+---+++
>> only showing top 1 row
>>
>> #
>>  492000
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |534351| 0.9450641392552516|0.23472935141246665|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
>> encoded)
>> 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Davies Liu
You are using lots of tiny executors (128 executor with only 2G
memory), could you try with bigger executor (for example 16G x 16)?

On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen  wrote:
>
> So I wrote some code to reproduce the problem.
>
> I assume here that a pipeline should be able to transform a categorical 
> feature with a few million levels.
> So I create a dataframe with the categorical feature (‘id’), apply a 
> StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
> the amount of levels.
> It breaks at 1.276.000 levels.
>
> Shall I report this as a ticket in JIRA?
>
> 
>
>
> from pyspark.sql.functions import rand
> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
> from pyspark.ml import Pipeline
>
> start_id = 10
> n = 500
> step = (n - start_id) / 25
>
> for i in xrange(start_id,start_id + n,step):
> print "#\n {}".format(i)
> dfr = (sqlContext
>.range(start_id, start_id + i)
>.withColumn(‘label', rand(seed=10))
>.withColumn('feat2', rand(seed=101))
> #.withColumn('normal', randn(seed=27))
>).repartition(32).cache()
> # dfr.select("id", rand(seed=10).alias("uniform"), 
> randn(seed=27).alias("normal")).show()
> dfr.show(1)
> print "This dataframe has {0} rows (and therefore {0} levels will be one 
> hot encoded)".format(dfr.count())
>
> categorical_feature  = ['id']
> stages = []
>
> for c in categorical_feature:
> stages.append(StringIndexer(inputCol=c, 
> outputCol="{}Index".format(c)))
> stages.append(OneHotEncoder(dropLast= False, inputCol = 
> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>
> columns = ["{}OHE".format(x) for x in categorical_feature]
> columns.append('feat2')
>
> assembler = VectorAssembler(
> inputCols=columns,
> outputCol="features")
> stages.append(assembler)
>
> df2 = dfr
>
> pipeline = Pipeline(stages=stages)
> pipeline_fitted = pipeline.fit(df2)
> df3 = pipeline_fitted.transform(df2)
> df3.show(1)
> dfr.unpersist()
>
>
> 
>
> Output:
>
>
> #
>  10
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |183601|0.38693226548356197|0.04485291680169634|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 10 rows (and therefore 10 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |183601|
> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
> +--+---+---+---+++
> only showing top 1 row
>
> #
>  296000
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |137008| 0.2996020619810592|0.38693226548356197|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |137008| 
> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
> +--+---+---+---+++
> only showing top 1 row
>
> #
>  492000
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |534351| 0.9450641392552516|0.23472935141246665|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |534351| 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Ben Teeuwen
So I wrote some code to reproduce the problem.

I assume here that a pipeline should be able to transform a categorical feature 
with a few million levels.
So I create a dataframe with the categorical feature (‘id’), apply a 
StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
the amount of levels.
It breaks at 1.276.000 levels.

Shall I report this as a ticket in JIRA?




from pyspark.sql.functions import rand
from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
from pyspark.ml import Pipeline

start_id = 10
n = 500
step = (n - start_id) / 25

for i in xrange(start_id,start_id + n,step):
print "#\n {}".format(i)
dfr = (sqlContext
   .range(start_id, start_id + i)
   .withColumn(‘label', rand(seed=10))
   .withColumn('feat2', rand(seed=101))
#.withColumn('normal', randn(seed=27))
   ).repartition(32).cache()
# dfr.select("id", rand(seed=10).alias("uniform"), 
randn(seed=27).alias("normal")).show()
dfr.show(1)
print "This dataframe has {0} rows (and therefore {0} levels will be one 
hot encoded)".format(dfr.count())

categorical_feature  = ['id'] 
stages = []

for c in categorical_feature:
stages.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c)))
stages.append(OneHotEncoder(dropLast= False, inputCol = 
"{}Index".format(c), outputCol = "{}OHE".format(c)))

columns = ["{}OHE".format(x) for x in categorical_feature]
columns.append('feat2')

assembler = VectorAssembler(
inputCols=columns,
outputCol="features")
stages.append(assembler)

df2 = dfr

pipeline = Pipeline(stages=stages)
pipeline_fitted = pipeline.fit(df2)
df3 = pipeline_fitted.transform(df2)
df3.show(1)
dfr.unpersist()




Output:

#
 10
+--+---+---+
|id|label  |  feat2|
+--+---+---+
|183601|0.38693226548356197|0.04485291680169634|
+--+---+---+
only showing top 1 row

This dataframe has 10 rows (and therefore 10 levels will be one hot 
encoded)
+--+---+---+---+++
|id|label  |  feat2|idIndex|   
idOHE|features|
+--+---+---+---+++
|183601|
0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
+--+---+---+---+++
only showing top 1 row

#
 296000
+--+---+---+
|id|label  |  feat2|
+--+---+---+
|137008| 0.2996020619810592|0.38693226548356197|
+--+---+---+
only showing top 1 row

This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
encoded)
+--+---+---+---+++
|id|label  |  feat2|idIndex|   
idOHE|features|
+--+---+---+---+++
|137008| 
0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
+--+---+---+---+++
only showing top 1 row

#
 492000
+--+---+---+
|id|label  |  feat2|
+--+---+---+
|534351| 0.9450641392552516|0.23472935141246665|
+--+---+---+
only showing top 1 row

This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
encoded)
+--+---+---+---+++
|id|label  |  feat2|idIndex|   
idOHE|features|
+--+---+---+---+++
|534351| 0.9450641392552516|0.23472935141246665| 
3656.0|(492000,[3656],[1...|(492001,[3656,492...|
+--+---+---+---+++
only showing top 1 row

#
 688000
+--+---+--+
|id|label  | feat2|
+--+---+--+
|573008| 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-11 Thread Nick Pentreath
Ok, interesting. Would be interested to see how it compares.

By the way, the feature size you select for the hasher should be a power of
2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes
are evenly distributed (see the section on HashingTF under
http://spark.apache.org/docs/latest/ml-features.html#tf-idf).

On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen  wrote:

> Thanks Nick, I played around with the hashing trick. When I set
> numFeatures to the amount of distinct values for the largest sparse
> feature, I ended up with half of them colliding. When raising the
> numFeatures to have less collisions, I soon ended up with the same memory
> problems as before. To be honest, I didn’t test the impact of having more
> or less collisions on the quality of the predictions, but tunnel visioned
> into getting it to work with the full sparsity.
>
> Before I worked in RDD land; zipWithIndex on rdd with distinct values +
> one entry ‘missing’ for missing values during predict, collectAsMap,
> broadcast the map, udf generating sparse vector, assembling the vectors
> manually). To move into dataframe land, I wrote:
>
> def getMappings(mode):
> mappings = defaultdict(dict)
> max_index = 0
> for c in cat_int[:]:# for every categorical variable
>
> logging.info("starting with {}".format(c))
> if mode == 'train':
> grouped = (df2
> .groupBy(c).count().orderBy('count', ascending = False)  #
> get counts, ordered from largest to smallest
> .selectExpr("*", "1 as n")  # prepare for window
> function summing up 1s before current row to create a RANK
> .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS
> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS
> index".format(max_index))
> .drop('n') # drop the column with static 1 values used for
> the cumulative sum
> )
> logging.info("Got {} rows.".format(grouped.count()))
> grouped.show()
> logging.info('getting max')
> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda
> r: r.t).first()  # update the max index so next categorical feature starts
> with it.
> logging.info("max_index has become: {}".format(max_index))
> logging.info('adding missing value, so we also train on this
> and prediction data missing it. ')
> schema = grouped.schema
> logging.info(schema)
> grouped = grouped.union(spark.createDataFrame([('missing', 0,
> max_index + 1)], schema))  # add index for missing value for values during
> predict that are unseen during training.
> max_index += 1
> saveto = "{}/{}".format(path, c)
> logging.info("Writing to: {}".format(saveto))
> grouped.write.parquet(saveto, mode = 'overwrite')
>
> elif mode == 'predict':
> loadfrom = "{}/{}".format(path, c)
> logging.info("Reading from: {}".format(loadfrom))
> grouped = spark.read.parquet(loadfrom)
>
> logging.info("Adding to dictionary")
> mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d:
> (d[c], d['index'])).collectAsMap()  # build up dictionary to be broadcasted
> later on, used for creating sparse vectors
> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r:
> r.t).first()
>
> logging.info("Sanity check for indexes:")
> for c in cat_int[:]:
> logging.info("{} min: {} max: {}".format(c,
> min(mappings[c].values()), max(mappings[c].values(   # some logging to
> confirm the indexes.
> logging.info("Missing value = {}".format(mappings[c]['missing']))
> return max_index, mappings
>
> I’d love to see the StringIndexer + OneHotEncoder transformers cope with
> missing values during prediction; for now I’ll work with the hacked stuff
> above :).
> (.. and I should compare the performance with using the hashing trick.)
>
> Ben
>
>
> On Aug 4, 2016, at 3:44 PM, Nick Pentreath 
> wrote:
>
> Sure, I understand there are some issues with handling this missing value
> situation in StringIndexer currently. Your workaround is not ideal but I
> see that it is probably the only mechanism available currently to avoid the
> problem.
>
> But the OOM issues seem to be more about the feature cardinality (so the
> size of the hashmap to store the feature <-> index mappings).
>
> A nice property of feature hashing is that it implicitly handles unseen
> category labels by setting the coefficient value to 0 (in the absence of a
> hash collision) - basically option 2 from H2O.
>
> Why is that? Well once you've trained your model you have a (sparse)
> N-dimensional weight vector that will be definition have 0s for unseen
> indexes. At test time, any feature that only appears in your test set or
> new data will be hashed to an index in the weight vector that has value 0.

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-11 Thread Ben Teeuwen
Thanks Nick, I played around with the hashing trick. When I set numFeatures to 
the amount of distinct values for the largest sparse feature, I ended up with 
half of them colliding. When raising the numFeatures to have less collisions, I 
soon ended up with the same memory problems as before. To be honest, I didn’t 
test the impact of having more or less collisions on the quality of the 
predictions, but tunnel visioned into getting it to work with the full sparsity.

Before I worked in RDD land; zipWithIndex on rdd with distinct values + one 
entry ‘missing’ for missing values during predict, collectAsMap, broadcast the 
map, udf generating sparse vector, assembling the vectors manually). To move 
into dataframe land, I wrote:

def getMappings(mode):
mappings = defaultdict(dict)
max_index = 0
for c in cat_int[:]:# for every categorical variable

logging.info("starting with {}".format(c))
if mode == 'train':
grouped = (df2  
.groupBy(c).count().orderBy('count', ascending = False)  # get 
counts, ordered from largest to smallest 
.selectExpr("*", "1 as n")  # prepare for window function 
summing up 1s before current row to create a RANK
.selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS BETWEEN 
UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS index".format(max_index))
.drop('n') # drop the column with static 1 values used for the 
cumulative sum
)
logging.info("Got {} rows.".format(grouped.count()))
grouped.show()
logging.info('getting max')
max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
r.t).first()  # update the max index so next categorical feature starts with it.
logging.info("max_index has become: {}".format(max_index))
logging.info('adding missing value, so we also train on this and 
prediction data missing it. ')
schema = grouped.schema
logging.info(schema)
grouped = grouped.union(spark.createDataFrame([('missing', 0, 
max_index + 1)], schema))  # add index for missing value for values during 
predict that are unseen during training.
max_index += 1
saveto = "{}/{}".format(path, c)
logging.info("Writing to: {}".format(saveto))
grouped.write.parquet(saveto, mode = 'overwrite')

elif mode == 'predict':
loadfrom = "{}/{}".format(path, c)
logging.info("Reading from: {}".format(loadfrom))
grouped = spark.read.parquet(loadfrom)

logging.info("Adding to dictionary")
mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: 
(d[c], d['index'])).collectAsMap()  # build up dictionary to be broadcasted 
later on, used for creating sparse vectors
max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
r.t).first()

logging.info("Sanity check for indexes:")
for c in cat_int[:]:
logging.info("{} min: {} max: {}".format(c, min(mappings[c].values()), 
max(mappings[c].values(   # some logging to confirm the indexes.
logging.info("Missing value = {}".format(mappings[c]['missing']))
return max_index, mappings

I’d love to see the StringIndexer + OneHotEncoder transformers cope with 
missing values during prediction; for now I’ll work with the hacked stuff above 
:).
(.. and I should compare the performance with using the hashing trick.)

Ben

> On Aug 4, 2016, at 3:44 PM, Nick Pentreath  wrote:
> 
> Sure, I understand there are some issues with handling this missing value 
> situation in StringIndexer currently. Your workaround is not ideal but I see 
> that it is probably the only mechanism available currently to avoid the 
> problem.
> 
> But the OOM issues seem to be more about the feature cardinality (so the size 
> of the hashmap to store the feature <-> index mappings).
> 
> A nice property of feature hashing is that it implicitly handles unseen 
> category labels by setting the coefficient value to 0 (in the absence of a 
> hash collision) - basically option 2 from H2O.
> 
> Why is that? Well once you've trained your model you have a (sparse) 
> N-dimensional weight vector that will be definition have 0s for unseen 
> indexes. At test time, any feature that only appears in your test set or new 
> data will be hashed to an index in the weight vector that has value 0.
> 
> So it could be useful for both of your problems.
> 
> On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen  > wrote:
> Hi Nick, 
> 
> Thanks for the suggestion. Reducing the dimensionality is an option, thanks, 
> but let’s say I really want to do this :).
> 
> The reason why it’s so big is that I’m unifying my training and test data, 
> and I don’t want to drop rows in the test data just because one of the 
> features was missing in the training data. 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Nick Pentreath
Sure, I understand there are some issues with handling this missing value
situation in StringIndexer currently. Your workaround is not ideal but I
see that it is probably the only mechanism available currently to avoid the
problem.

But the OOM issues seem to be more about the feature cardinality (so the
size of the hashmap to store the feature <-> index mappings).

A nice property of feature hashing is that it implicitly handles unseen
category labels by setting the coefficient value to 0 (in the absence of a
hash collision) - basically option 2 from H2O.

Why is that? Well once you've trained your model you have a (sparse)
N-dimensional weight vector that will be definition have 0s for unseen
indexes. At test time, any feature that only appears in your test set or
new data will be hashed to an index in the weight vector that has value 0.

So it could be useful for both of your problems.

On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen  wrote:

> Hi Nick,
>
> Thanks for the suggestion. Reducing the dimensionality is an option,
> thanks, but let’s say I really want to do this :).
>
> The reason why it’s so big is that I’m unifying my training and test data,
> and I don’t want to drop rows in the test data just because one of the
> features was missing in the training data. I wouldn’t need this
>  workaround, if I had a better *strategy in Spark for dealing with
> missing levels. *How Spark can deal with it:
>
>
> *"Additionally, there are two strategies regarding how StringIndexer will
> handle unseen labels when you have fit aStringIndexer on one dataset and
> then use it to transform another:*
>
> * • throw an exception (which is the default)*
> * • skip the row containing the unseen label entirely"*
> http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer
>
> I like how *H2O* handles this;
>
> *"What happens during prediction if the new sample has categorical levels
> not seen in training? The value will be filled with either special
> missing level (if trained with missing values and missing_value_handling
> was set to MeanImputation) or 0.”*
>
> https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md
>
> So assuming I need to unify the data, make it huge, and trying out more in
> scala, I see *these kinds of errors*:
> _
>
> scala> feedBack(s"Applying string indexers: fitting")
> 2016-08-04 10:13:20() | Applying string indexers: fitting
>
> scala> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
> pipelined: org.apache.spark.ml.Pipeline = pipeline_83be3b554e3a
>
> scala> val dfFitted = pipelined.fit(df)
> dfFitted: org.apache.spark.ml.PipelineModel = pipeline_83be3b554e3a
>
> scala> feedBack(s"Applying string indexers: transforming")
> 2016-08-04 10:17:29() | Applying string indexers: transforming
>
> scala> var df2 = dfFitted.transform(df)
> df2: org.apache.spark.sql.DataFrame = [myid: string, feature1: int ... 16
> more fields]
>
> scala>
>
> scala> feedBack(s"Applying OHE: fitting")
> 2016-08-04 10:18:07() | Applying OHE: fitting
>
> scala> val pipelined2 = new Pipeline().setStages(stagesOhe.toArray)
> pipelined2: org.apache.spark.ml.Pipeline = pipeline_ba7922a29322
>
> scala> val dfFitted2 = pipelined2.fit(df2)
> 16/08/04 10:21:41 WARN DFSClient: Slow ReadProcessor read fields took
> 85735ms (threshold=3ms); ack: seqno: -2 status: SUCCESS status: ERROR
> downstreamAckTimeNanos: 0, targets: [10.10.66.13:50010, 10.10.95.11:50010,
> 10.10.95.29:50010]
> 16/08/04 10:21:41 WARN DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993380377
> java.io.IOException: Bad response ERROR for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 from
> datanode 10.10.95.11:50010
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
> 16/08/04 10:21:41 WARN DFSClient: Error Recovery for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 in
> pipeline 10.10.66.13:50010, 10.10.95.11:50010, 10.10.95.29:50010: bad
> datanode 10.10.95.11:50010
> dfFitted2: org.apache.spark.ml.PipelineModel = pipeline_ba7922a29322
>
> scala> feedBack(s"Applying OHE: transforming")
> 2016-08-04 10:29:12() | Applying OHE: transforming
>
> scala> df2 = dfFitted2.transform(df2).cache()
> 16/08/04 10:34:18 WARN DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993414608
> java.io.EOFException: Premature EOF: no length prefix available
> at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 16/08/04 10:34:18 WARN 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Ben Teeuwen
Hi Nick, 

Thanks for the suggestion. Reducing the dimensionality is an option, thanks, 
but let’s say I really want to do this :).

The reason why it’s so big is that I’m unifying my training and test data, and 
I don’t want to drop rows in the test data just because one of the features was 
missing in the training data. I wouldn’t need this  workaround, if I had a 
better strategy in Spark for dealing with missing levels. How Spark can deal 
with it:

"Additionally, there are two strategies regarding how StringIndexer will handle 
unseen labels when you have fit aStringIndexer on one dataset and then use it 
to transform another:
• throw an exception (which is the default)
• skip the row containing the unseen label entirely"
http://spark.apache.org/docs/2.0.0/ml-features.html#stringindexer 
 

I like how H2O handles this; 

"What happens during prediction if the new sample has categorical levels not 
seen in training? The value will be filled with either special missing level 
(if trained with missing values and missing_value_handling was set to 
MeanImputation) or 0.”
https://github.com/h2oai/h2o-3/blob/master/h2o-docs/src/product/tutorials/datascience/DataScienceH2O-Dev.md
 


So assuming I need to unify the data, make it huge, and trying out more in 
scala, I see these kinds of errors:
_

scala> feedBack(s"Applying string indexers: fitting")
2016-08-04 10:13:20() | Applying string indexers: fitting

scala> val pipelined = new Pipeline().setStages(stagesIndex.toArray)
pipelined: org.apache.spark.ml.Pipeline = pipeline_83be3b554e3a

scala> val dfFitted = pipelined.fit(df)
dfFitted: org.apache.spark.ml.PipelineModel = pipeline_83be3b554e3a

scala> feedBack(s"Applying string indexers: transforming")
2016-08-04 10:17:29() | Applying string indexers: transforming

scala> var df2 = dfFitted.transform(df)
df2: org.apache.spark.sql.DataFrame = [myid: string, feature1: int ... 16 more 
fields]

scala>

scala> feedBack(s"Applying OHE: fitting")
2016-08-04 10:18:07() | Applying OHE: fitting

scala> val pipelined2 = new Pipeline().setStages(stagesOhe.toArray)
pipelined2: org.apache.spark.ml.Pipeline = pipeline_ba7922a29322

scala> val dfFitted2 = pipelined2.fit(df2)
16/08/04 10:21:41 WARN DFSClient: Slow ReadProcessor read fields took 85735ms 
(threshold=3ms); ack: seqno: -2 status: SUCCESS status: ERROR 
downstreamAckTimeNanos: 0, targets: [10.10.66.13:50010, 10.10.95.11:50010, 
10.10.95.29:50010]
16/08/04 10:21:41 WARN DFSClient: DFSOutputStream ResponseProcessor exception  
for block BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993380377
java.io.IOException: Bad response ERROR for block 
BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 from 
datanode 10.10.95.11:50010
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
16/08/04 10:21:41 WARN DFSClient: Error Recovery for block 
BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993380377 in 
pipeline 10.10.66.13:50010, 10.10.95.11:50010, 10.10.95.29:50010: bad datanode 
10.10.95.11:50010
dfFitted2: org.apache.spark.ml.PipelineModel = pipeline_ba7922a29322

scala> feedBack(s"Applying OHE: transforming")
2016-08-04 10:29:12() | Applying OHE: transforming

scala> df2 = dfFitted2.transform(df2).cache()
16/08/04 10:34:18 WARN DFSClient: DFSOutputStream ResponseProcessor exception  
for block BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993414608
java.io.EOFException: Premature EOF: no length prefix available
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2203)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
16/08/04 10:34:18 WARN DFSClient: Error Recovery for block 
BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993414608 in 
pipeline 10.10.66.13:50010, 10.10.66.3:50010, 10.10.95.29:50010: bad datanode 
10.10.66.13:50010
16/08/04 10:36:03 WARN DFSClient: Slow ReadProcessor read fields took 74146ms 
(threshold=3ms); ack: seqno: -2 status: SUCCESS status: SUCCESS status: 
ERROR downstreamAckTimeNanos: 0, targets: [10.10.66.3:50010, 10.10.66.1:50010, 
10.10.95.29:50010]
16/08/04 10:36:03 WARN DFSClient: DFSOutputStream ResponseProcessor exception  
for block BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993467488
java.io.IOException: Bad response ERROR for block 
BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 from 
datanode 10.10.95.29:50010
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
16/08/04 10:36:03 WARN DFSClient: Error 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Nick Pentreath
Hi Ben

Perhaps with this size cardinality it is worth looking at feature hashing
for your problem. Spark has the HashingTF transformer that works on a
column of "sentences" (i.e. [string]).

For categorical features you can hack it a little by converting your
feature value into a ["feature_name=feature_value"] representation. Then
HashingTF can be used as is. Note you can also just do ["feature_value"],
but the former would allow you, with a bit of munging, to hash all your
feature columns at the same time.

The advantage is speed and bounded memory footprint. The disadvantages
include (i) no way to reverse the mapping from feature_index ->
feature_name; (ii) potential for hash collisions (can be helped a bit by
increasing your feature vector size).

Here is a minimal example:

In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder,
HashingTF
In [2]: from pyspark.sql.types import StringType, ArrayType
In [3]: from pyspark.sql.functions import udf

In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"), (3,
"baz")], ["id", "feature"])

In [5]: to_array = udf(lambda s: ["feature=%s" % s],
ArrayType(StringType()))

In [6]: df = df.withColumn("features", to_array("feature"))

In [7]: df.show()
+---+---+-+
| id|feature| features|
+---+---+-+
|  0|foo|[feature=foo]|
|  1|bar|[feature=bar]|
|  2|foo|[feature=foo]|
|  3|baz|[feature=baz]|
+---+---+-+

In [8]: indexer = StringIndexer(inputCol="feature",
outputCol="feature_index")

In [9]: indexed = indexer.fit(df).transform(df)

In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index",
outputCol="feature_vector")

In [11]: encoded = encoder.transform(indexed)

In [12]: encoded.show()
+---+---+-+-+--+
| id|feature| features|feature_index|feature_vector|
+---+---+-+-+--+
|  0|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
|  1|bar|[feature=bar]|  2.0| (3,[2],[1.0])|
|  2|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
|  3|baz|[feature=baz]|  1.0| (3,[1],[1.0])|
+---+---+-+-+--+

In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features",
outputCol="features_vector")

In [23]: hashed = hasher.transform(df)

In [24]: hashed.show()
+---+---+-+-+
| id|feature| features|  features_vector|
+---+---+-+-+
|  0|foo|[feature=foo]| (256,[59],[1.0])|
|  1|bar|[feature=bar]|(256,[219],[1.0])|
|  2|foo|[feature=foo]| (256,[59],[1.0])|
|  3|baz|[feature=baz]| (256,[38],[1.0])|
+---+---+-+-+

On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen  wrote:

> I raised driver memory to 30G and maxresultsize to 25G, this time in
> pyspark.
>
> *Code run:*
>
> cat_int  = ['bigfeature']
>
> stagesIndex = []
> stagesOhe   = []
> for c in cat_int:
>   stagesIndex.append(StringIndexer(inputCol=c,
> outputCol="{}Index".format(c)))
>   stagesOhe.append(OneHotEncoder(dropLast= False, inputCol =
> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>
> df2 = df
>
> for i in range(len(stagesIndex)):
>   logging.info("Starting with {}".format(cat_int[i]))
>   stagesIndex[i].fit(df2)
>   logging.info("Fitted. Now transforming:")
>   df2 = stagesIndex[i].fit(df2).transform(df2)
>   logging.info("Transformed. Now showing transformed:")
>   df2.show()
>   logging.info("OHE")
>   df2 = stagesOhe[i].transform(df2)
>   logging.info("Fitted. Now showing OHE:")
>   df2.show()
>
> *Now I get error:*
>
> 2016-08-04 08:53:44,839 INFO   Starting with bigfeature
> [57/7074]
> ukStringIndexer_442b8e11e3294de9b83a
> 2016-08-04 09:06:18,147 INFO   Fitted. Now transforming:
> 16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 -
> Cannot receive any reply in 120 seconds. This timeout is controlled by
> spark.rpc.askTimeout
> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120
> seconds. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
> at scala.util.Try$.apply(Try.scala:192)
> at scala.util.Failure.recover(Try.scala:216)
> at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
> at
> scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
> at 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Ben Teeuwen
I raised driver memory to 30G and maxresultsize to 25G, this time in pyspark. 

Code run:

cat_int  = ['bigfeature']

stagesIndex = []
stagesOhe   = []
for c in cat_int:
  stagesIndex.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c)))
  stagesOhe.append(OneHotEncoder(dropLast= False, inputCol = 
"{}Index".format(c), outputCol = "{}OHE".format(c)))

df2 = df

for i in range(len(stagesIndex)):
  logging.info("Starting with {}".format(cat_int[i]))
  stagesIndex[i].fit(df2)
  logging.info("Fitted. Now transforming:")
  df2 = stagesIndex[i].fit(df2).transform(df2)
  logging.info("Transformed. Now showing transformed:")
  df2.show()
  logging.info("OHE")
  df2 = stagesOhe[i].transform(df2)
  logging.info("Fitted. Now showing OHE:")
  df2.show()

Now I get error:

2016-08-04 08:53:44,839 INFO   Starting with bigfeature   
[57/7074]
ukStringIndexer_442b8e11e3294de9b83a
2016-08-04 09:06:18,147 INFO   Fitted. Now transforming:
16/08/04 09:10:35 WARN BlockManagerMaster: Failed to remove shuffle 3 - Cannot 
receive any reply in 120 seconds. This timeout is controlled by 
spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 
seconds. This timeout is controlled by spark.rpc.askTimeout
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Failure.recover(Try.scala:216)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at 
scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at 
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

 [13/7074]
at 

OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-03 Thread Ben Teeuwen
Hi,

I want to one hot encode a column containing 56 million distinct values. My 
dataset is 800m rows + 17 columns.
I first apply a StringIndexer, but it already breaks there giving a OOM java 
heap space error.

I launch my app on YARN with:
/opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors 128 
--executor-cores 2 --driver-memory 12G --conf spark.driver.maxResultSize=8G

After grabbing the data, I run:

val catInts = Array(“bigfeature”)

val stagesIndex = scala.collection.mutable.ArrayBuffer.empty[StringIndexer]
val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder]
for (c <- catInts) {
  println(s"starting with $c")
  val i = new StringIndexer()
.setInputCol(c)
.setOutputCol(s"${c}Index")
  stagesIndex += i

  val o = new OneHotEncoder()
.setDropLast(false)
.setInputCol(s"${c}Index")
.setOutputCol(s"${c}OHE")
  stagesOhe += o
}

println(s"Applying string indexers: fitting")
val pipelined = new Pipeline().setStages(stagesIndex.toArray)
val dfFitted = pipelined.fit(df)


Then, the application master shows a "countByValue at StringIndexer.scala” 
taking 1.8 minutes (so very fast). 
Afterwards, the shell console hangs for a while. What is it doing now? After 
some time, it shows:

scala> val dfFitted = pipelined.fit(df)
  
java.lang.OutOfMemoryError: Java heap space
  at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
  at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
  at 
org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159)
  at org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230)
  at 
org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167)
  at 
org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86)
  at 
org.apache.spark.ml.feature.StringIndexerModel.(StringIndexer.scala:137)
  at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93)
  at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66)
  at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
  at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at 
scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
  at 
scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
  at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)
  ... 16 elided