bq. lambda part: save_sets(part, KEY_SET_NAME, Where do you save the part to ?
For OutOfMemoryError, the last line was from Utility.scala Anything before that ? Thanks On Thu, Dec 3, 2015 at 11:47 AM, Augustus Hong <augus...@branchmetrics.io> wrote: > Hi All, > > I'm running Spark Streaming (Python) with Direct Kafka and I'm seeing that > the memory usage will slowly go up and eventually kill the job in a few > days. > > Everything runs fine at first but after a few days the job started issuing > *error: [Errno 104] Connection reset by peer , *followed by > *java.lang.OutOfMemoryError: GC overhead limit exceeded *when I tried to > access the web UI. > > I'm not using any fancy settings, pretty much just the default, and give > each executor(4 cores) 14G of memory and 40G to the driver. > > I looked through the mailing list and around the web. There were a few > streaming running out of memory issues but with no apparent solutions. If > anyone have insights into this please let me know! > > Best, > Augustus > > > ------------------------------------------------------------------------------------- > > Detailed Logs below: > > The first error I see is this: > > 15/12/02 19:05:03 INFO scheduler.DAGScheduler: Job 270661 finished: call >>> at /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py:1206, >>> took 0.560671 s >> >> 15/12/02 19:05:09 ERROR python.PythonRDD: Error while sending iterator >> >> java.net.SocketTimeoutException: Accept timed out >> >> at java.net.PlainSocketImpl.socketAccept(Native Method) >> >> at >>> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) >> >> at java.net.ServerSocket.implAccept(ServerSocket.java:530) >> >> at java.net.ServerSocket.accept(ServerSocket.java:498) >> >> at >>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:613) >> >> Traceback (most recent call last): >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", >>> line 62, in call >> >> r = self.func(t, *rdds) >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", >>> line 159, in <lambda> >> >> func = lambda t, rdd: old_func(rdd) >> >> File "/root/spark-projects/click-flow/click-stream.py", line 141, in >>> <lambda> >> >> keys.foreachRDD(lambda rdd: rdd.foreachPartition(lambda part: >>> save_sets(part, KEY_SET_NAME, ITEMS_PER_KEY_LIMIT_KEYS))) >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 766, in >>> foreachPartition >> >> self.mapPartitions(func).count() # Force evaluation >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in >>> count >> >> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in >>> sum >> >> return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in >>> fold >> >> vals = self.mapPartitions(func).collect() >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 774, in >>> collect >> >> return list(_load_from_socket(port, self._jrdd_deserializer)) >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in >>> _load_from_socket >> >> for item in serializer.load_stream(rf): >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >>> 139, in load_stream >> >> yield self._read_with_length(stream) >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >>> 156, in _read_with_length >> >> length = read_int(stream) >> >> File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >>> 542, in read_int >> >> length = stream.read(4) >> >> File "/usr/lib64/python2.6/socket.py", line 383, in read >> >> data = self._sock.recv(left) >> >> error: [Errno 104] Connection reset by peer >> >> 15/12/02 19:05:09 INFO scheduler.JobScheduler: Finished job streaming job >>> 1449082320000 ms.1 from job set of time 1449082320000 ms >> >> >>> >>> > And then when I try to access the web UI this error is thrown, leading me > to believe that it has something to do with memory being full: > > 15/12/02 19:43:34 WARN servlet.ServletHandler: Error for /jobs/ >> java.lang.OutOfMemoryError: GC overhead limit exceeded >> at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64) >> at java.lang.StringBuilder.<init>(StringBuilder.java:97) >> at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:46) >> at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:51) >> at scala.xml.Attribute$class.toString1(Attribute.scala:96) >> at scala.xml.UnprefixedAttribute.toString1(UnprefixedAttribute.scala:16) >> at scala.xml.MetaData.buildString(MetaData.scala:202) >> at scala.xml.Utility$.serialize(Utility.scala:216) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at scala.xml.Utility$.sequenceToXML(Utility.scala:256) >> at scala.xml.Utility$.serialize(Utility.scala:227) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at scala.xml.Utility$.sequenceToXML(Utility.scala:256) >> at scala.xml.Utility$.serialize(Utility.scala:227) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at scala.xml.Utility$.sequenceToXML(Utility.scala:256) >> at scala.xml.Utility$.serialize(Utility.scala:227) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at scala.xml.Utility$.sequenceToXML(Utility.scala:256) >> at scala.xml.Utility$.serialize(Utility.scala:227) > > > > > > -- > [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus > Hong* > Data Analytics | Branch Metrics > m 650-391-3369 | e augus...@branch.io >