Hello,

I am working on a machine learning project, currently using 
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running on 
a Dell laptop with i7-5600@2.6 GHz * 4 cores, 15.6 GB RAM). I also mention 
working in Python from an IPython notebook.


I face the following problem: when working with a Dataframe created from a CSV 
file (2.7 GB) with schema inferred (1900 features), the time it takes for Spark 
to count the 145231 rows is 3:30 minutes using 4 cores. Longer times are 
recorder for computing one feature's statistics, for example:



--------------------------------------------START AT: 2015-09-21 08:56:41.136947

+-------+------------------+
|summary|          VAR_1933|
+-------+------------------+
|  count|            145231|
|   mean| 8849.839111484464|
| stddev|3175.7863998269395|
|    min|                 0|
|    max|              9999|
+-------+------------------+


--------------------------------------------FINISH AT: 2015-09-21 
09:02:49.452260




So, my first question would be what configuration parameters to set in order to 
improve this performance?

I tried some explicit configuration in the IPython notebook, but specifying 
resources explicitly when creating the Spark configuration resulted in worse 
performance; I mean :

config = 
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars", 
jar_path)

worked twice faster than:

config = 
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars", 
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory ", 
"3g")


****************************


Secondly, when I do the one hot encoding (I tried two different ways of keeping 
results) I don't arrive at showing the head(1) of the resulted dataframe. We 
have the function :

def OHE_transform(categ_feat, df_old):
    outputcolname = categ_feat + "_ohe_index"
    outputcolvect = categ_feat + "_ohe_vector"
    stringIndexer = StringIndexer(inputCol=categ_feat, outputCol=outputcolname)
    indexed = stringIndexer.fit(df_old).transform(df_old)
    encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
    encoded = encoder.transform(indexed)
    return encoded


The two manners for keeping results are depicted below:

1)

result = OHE_transform(list_top_feat[0], df_top_categ)
for item in list_top_feat[1:]:
    result = OHE_transform(item, result)
    result.head(1)


2)

df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1)
...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)

In the first approach, at the third iteration (in the for loop), when it was 
supposed to print the head(1), the IPython notebook  remained in the state 
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note that 
here I eliminated the intermediary prints of the head(1)), but it gave an "out 
of memory" error at the only (final result) head(1),  that I paste below :

===============================================


df_result_12.head(1)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-29-e952d1766630> in <module>()
----> 1 df_result_12.head(1)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
head(self, n)
    649             rs = self.head(1)
    650             return rs[0] if rs else None
--> 651         return self.take(n)
    652
    653     @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
take(self, num)
    305         [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
    306         """
--> 307         return self.limit(num).collect()
    308
    309     @ignore_unicode_prefix

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in 
collect(self)
    279         """
    280         with SCCallSiteSync(self._sc) as css:
--> 281             port = 
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
    282         rs = list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer())))
    283         cls = _create_cls(self.schema)

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539
    540         for temp_arg in temp_args:

/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in 
stage 35.0 failed 1 times, most recent failure: Lost task 3.0 in stage 35.0 
(TID 253, localhost): java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.StringBuilder.toString(StringBuilder.java:405)
        at 
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3075)
        at 
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2871)
        at java.io.ObjectInputStream.readString(ObjectInputStream.java:1638)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1341)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


=================================================================

In the second approach, continuing after this error resulted in :


Traceback (most recent call last):
  File "/usr/lib/python2.7/SocketServer.py", line 295, in 
_handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 321, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python2.7/SocketServer.py", line 334, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python2.7/SocketServer.py", line 649, in __init__
    self.handle()
  File 
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/accumulators.py", line 
235, in handle
    num_updates = read_int(self.rfile)
  File "/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/serializers.py", 
line 544, in read_int
    raise EOFError
EOFError
ERROR:py4j.java_gateway:Error while sending or receiving.
Traceback (most recent call last):
  File 
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 479, in send_command
    raise Py4JError("Answer from Java side is empty")
Py4JError: Answer from Java side is empty
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java 
server
Traceback (most recent call last):
  File 
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java 
server
Traceback (most recent call last):
  File 
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java 
server
Traceback (most recent call last):
  File 
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 425, in start
    self.socket.connect((self.address, self.port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused


================================================

So, once again it seems that I need to change some configuration parameters to 
prevent from such out of memory errors.


Thank you very much in advance.

Camelia






Reply via email to