Hi,

I'm reading data stored in S3 and aggregating and storing it in Cassandra
using a spark job.

When I run the job with approx 3Mil records (about 3-4 GB of data) stored
in text files, I get the following error:

 (11529/14925)15/04/10 19:32:43 INFO TaskSetManager: Starting task 11609.0
in stage 4.0 (TID 56384,
spark-slaves-test-cluster-k0b6.c.silver-argon-837.internal,
PROCESS_LOCAL, 134 System information as of Fri Apr 10 19:08:57 UTC
201515/04/10 19:32:58 ERROR ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
ActorSystem [sparkDriv System load: 0.07 Processes: 155 Usage of /: 48.3%
of 9.81GB Users logged in:
015/04/10 19:32:58 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
ActorSystem [sparkDriver]
*java.lang.OutOfMemoryError: GC overhead limit exceeded at*
 java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.
AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at
java.lang.AbstractStringBuilder.ensureCapacityInternal(
AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(
AbstractStringBuilder.java:535) at
java.lang.StringBuilder.append(StringBuilder.java:204)
at java.io.ObjectInputStream$BlockDataInputStream.
readUTFSpan(ObjectInputStream.java:3143) at java.io.ObjectInputStream$
BlockDataInputStream.readUTFBody(ObjectInputStream.java:3051) at
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)
at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072) at
java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671) at
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at
scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161) at akka.serialization.
Serialization.deserialize(Serialization.scala:98) at
akka.remote.serialization.MessageContainerSerializer.fromBinary(
MessageContainerSerializer.scala:63) at akka.serialization.
Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at
scala.util.Try$.apply(Try.scala:161) at akka.serialization.
Serialization.deserialize(Serialization.scala:98) at
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at
akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76) at
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)

This error occurs in the final step of my script, when i'm storing the
processed records in Cassandra.

My memory-per-node is 10GB which means that *all my records should fit on
one machine.*

The script is in pyspark and I'm using a cluster with:

   - *Workers:* 5
   - *Cores:* 80 Total, 80 Used
   - *Memory:* 506.5 GB Total, 40.0 GB Used

Here is the relevant part of the code, for reference :

def connectAndSave(partition):
   cluster = Cluster(['10.240.1.17'])
   dbsession = cluster.connect("load_test")
   ret = map(lambda x : saveUserData(x,dbsession),partition)
   dbsession.shutdown()
   cluster.shutdown()



res = sessionsRdd.foreachPartition(lambda partition : connectAndSave(
partition))

Reply via email to