Try reducing the number of workers to 2, and increasing their memory up to 6GB.
However I've seen mention of a bug in the pyspark API for when calling head()
on a dataframe in spark 1.5.0 and 1.4, it's got a big performance hit.
https://issues.apache.org/jira/browse/SPARK-10731
It's fixed in spark 1.5.1 which was released yesterday, so maybe try upgrading.
Ewan
-Original Message-
From: camelia [mailto:came...@chalmers.se]
Sent: 30 September 2015 10:51
To: user@spark.apache.org
Subject: Need for advice - performance improvement and out of memory resolution
Hello,
I am working on a machine learning project, currently using
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running on
a Dell laptop with i7-5600@2.6 GHz * 4 cores, 15.6 GB RAM). I also mention
working in Python from an IPython notebook.
I face the following problem: when working with a Dataframe created from a CSV
file (2.7 GB) with schema inferred (1900 features), the time it takes for Spark
to count the 145231 rows is 3:30 minutes using 4 cores. Longer times are
recorder for computing one feature's statistics, for example:
START AT: 2015-09-21
08:56:41.136947
+---+--+
|summary| VAR_1933|
+---+--+
| count|145231|
| mean| 8849.839111484464|
| stddev|3175.7863998269395|
|min| 0|
|max| |
+---+--+
FINISH AT: 2015-09-21
09:02:49.452260
So, my first question would be what configuration parameters to set in order to
improve this performance?
I tried some explicit configuration in the IPython notebook, but specifying
resources explicitly when creating the Spark configuration resulted in worse
performance; I mean :
config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path)
worked twice faster than:
config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory
", "3g")
Secondly, when I do the one hot encoding (I tried two different ways of keeping
results) I don't arrive at showing the head(1) of the resulted dataframe. We
have the function :
def OHE_transform(categ_feat, df_old):
outputcolname = categ_feat + "_ohe_index"
outputcolvect = categ_feat + "_ohe_vector"
stringIndexer = StringIndexer(inputCol=categ_feat,
outputCol=outputcolname)
indexed = stringIndexer.fit(df_old).transform(df_old)
encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
encoded = encoder.transform(indexed)
return encoded
The two manners for keeping results are depicted below:
1)
result = OHE_transform(list_top_feat[0], df_top_categ) for item in
list_top_feat[1:]:
result = OHE_transform(item, result)
result.head(1)
2)
df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1) ...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)
In the first approach, at the third iteration (in the for loop), when it was
supposed to print the head(1), the IPython notebook remained in the state
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note that
here I eliminated the intermediary prints of the head(1)), but it gave an "out
of memory" error at the only (final result) head(1), that I paste below :
===
df_result_12.head(1)
---
Py4JJavaError Traceback (most recent call last)
in ()
> 1 df_result_12.head(1)
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
head(self, n)
649 rs = self.head(1)
650 return rs[0] if rs else None
--> 651 return self.take(n)
652
653 @ignore_unicode_prefix
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
take(self, num)
305 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
306 """
--> 307 return self.limit(num).collect()
308
309 @ignore_unicode_prefix
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
collect(self)
279 """
280 with SCCallSiteSync(self._sc) as css:
--> 281 port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
282 rs = list(_load_from_socket(port,
BatchedSerializer(PickleSerializer(
283 cls = _create_cls(self.schema)
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self,