bq. lambda part: save_sets(part, KEY_SET_NAME,

Where do you save the part to ?

For OutOfMemoryError, the last line was from Utility.scala
Anything before that ?

Thanks

On Thu, Dec 3, 2015 at 11:47 AM, Augustus Hong <augus...@branchmetrics.io>
wrote:

> Hi All,
>
> I'm running Spark Streaming (Python) with Direct Kafka and I'm seeing that
> the memory usage will slowly go up and eventually kill the job in a few
> days.
>
> Everything runs fine at first but after a few days the job started issuing
> *error: [Errno 104] Connection reset by peer ,  *followed by
> *java.lang.OutOfMemoryError: GC overhead limit exceeded *when I tried to
> access the web UI.
>
> I'm not using any fancy settings, pretty much just the default, and give
> each executor(4 cores) 14G of memory and 40G to the driver.
>
> I looked through the mailing list and around the web. There were a few
> streaming running out of memory issues but with no apparent solutions. If
> anyone have insights into this please let me know!
>
> Best,
> Augustus
>
>
> -------------------------------------------------------------------------------------
>
> Detailed Logs below:
>
> The first error I see is this:
>
> 15/12/02 19:05:03 INFO scheduler.DAGScheduler: Job 270661 finished: call
>>> at /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py:1206,
>>> took 0.560671 s
>>
>> 15/12/02 19:05:09 ERROR python.PythonRDD: Error while sending iterator
>>
>> java.net.SocketTimeoutException: Accept timed out
>>
>>   at java.net.PlainSocketImpl.socketAccept(Native Method)
>>
>>   at
>>> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
>>
>>   at java.net.ServerSocket.implAccept(ServerSocket.java:530)
>>
>>   at java.net.ServerSocket.accept(ServerSocket.java:498)
>>
>>   at
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:613)
>>
>> Traceback (most recent call last):
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",
>>> line 62, in call
>>
>>     r = self.func(t, *rdds)
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py",
>>> line 159, in <lambda>
>>
>>     func = lambda t, rdd: old_func(rdd)
>>
>>   File "/root/spark-projects/click-flow/click-stream.py", line 141, in
>>> <lambda>
>>
>>     keys.foreachRDD(lambda rdd: rdd.foreachPartition(lambda part:
>>> save_sets(part, KEY_SET_NAME, ITEMS_PER_KEY_LIMIT_KEYS)))
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 766, in
>>> foreachPartition
>>
>>     self.mapPartitions(func).count()  # Force evaluation
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in
>>> count
>>
>>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in
>>> sum
>>
>>     return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in
>>> fold
>>
>>     vals = self.mapPartitions(func).collect()
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 774, in
>>> collect
>>
>>     return list(_load_from_socket(port, self._jrdd_deserializer))
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in
>>> _load_from_socket
>>
>>     for item in serializer.load_stream(rf):
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>>> 139, in load_stream
>>
>>     yield self._read_with_length(stream)
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>>> 156, in _read_with_length
>>
>>     length = read_int(stream)
>>
>>   File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>>> 542, in read_int
>>
>>     length = stream.read(4)
>>
>>   File "/usr/lib64/python2.6/socket.py", line 383, in read
>>
>>     data = self._sock.recv(left)
>>
>> error: [Errno 104] Connection reset by peer
>>
>> 15/12/02 19:05:09 INFO scheduler.JobScheduler: Finished job streaming job
>>> 1449082320000 ms.1 from job set of time 1449082320000 ms
>>
>>
>>>
>>>
> And then when I try to access the web UI this error is thrown, leading me
> to believe that it has something to do with memory being full:
>
> 15/12/02 19:43:34 WARN servlet.ServletHandler: Error for /jobs/
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>   at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64)
>>   at java.lang.StringBuilder.<init>(StringBuilder.java:97)
>>   at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:46)
>>   at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:51)
>>   at scala.xml.Attribute$class.toString1(Attribute.scala:96)
>>   at scala.xml.UnprefixedAttribute.toString1(UnprefixedAttribute.scala:16)
>>   at scala.xml.MetaData.buildString(MetaData.scala:202)
>>   at scala.xml.Utility$.serialize(Utility.scala:216)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>>   at scala.xml.Utility$.serialize(Utility.scala:227)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>>   at scala.xml.Utility$.serialize(Utility.scala:227)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>>   at scala.xml.Utility$.serialize(Utility.scala:227)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>>   at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>>   at scala.xml.Utility$.serialize(Utility.scala:227)
>
>
>
>
>
> --
> [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus
> Hong*
>  Data Analytics | Branch Metrics
>  m 650-391-3369 | e augus...@branch.io
>

Reply via email to