[ https://issues.apache.org/jira/browse/SPARK-27828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-27828. ---------------------------------- Resolution: Cannot Reproduce > spark job hangs when kryo.serializers.FieldSerializer is called under > multi-executor-cores settings > --------------------------------------------------------------------------------------------------- > > Key: SPARK-27828 > URL: https://issues.apache.org/jira/browse/SPARK-27828 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.0 > Reporter: Itsuki Toyota > Priority: Major > > Hi, > I found that spark job hangs when kryo.serializers.FieldSerializer is called > under multi-executor-cores settings. > Concretely to say, when I try to load facts (e.g., <Obama> <wasBornIn> > <America>) from gzipped .nt text file and convert it to RDD[Triple] [0] and > evaluate it, spark job hangs under specific conditions. > A reproducible procedure is as follows: > 1) Create .nt file > {code:java} > # BIBM deta generator ( > https://sourceforge.net/projects/bsbmtools/files/bsbmtools/bsbmtools-0.2/bsbmtools-v0.2.zip/download > ) > $ ./generate -fc -s nt -fn dataset_10M -pc 28480{code} > 2) Compress .nt file > {code:java} > $ spark-shell > > import org.apache.hadoop.io.compress.GzipCodec > > sc.textFile("dataset_10M.nt").repartition(100).saveAsTextFile("dataset_10M_gzip_100", > > classOf[GzipCodec]){code} > 3) Load the .nt file and evaluate (i.e., RDD.count) it after repartition > Code: > {code:java} > > package jp.hang.spark > import java.io.{StringReader, ByteArrayInputStream} > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.rdd.RDD > import org.apache.jena.graph.Triple > import org.apache.jena.rdf.model.{Model, ModelFactory} > object MyTest { > private def argParser(args: Array[String]): ArgConfig = { > val optionParser = new scopt.OptionParser[ArgConfig]("spark-submit <spark > commands>") { > head("AAAA") > opt[String]('i', "nt-path") required() valueName "<Path>" action { > (x, c) => c.copy(ntPath = x) > } text "Path to the ntriple" > } > optionParser.parse(args, ArgConfig()) match { > case Some(config) => config > case None => sys.exit(-1) // arguments are bad, error message will have been > displayed > } > } > private case class ArgConfig(ntPath: String = "") > def main(args: Array[String]) { > val arguments: ArgConfig = argParser(args) > val conf = new SparkConf().setAppName("MyTest") > val sc = new SparkContext(conf) > val rawTriples: RDD[String] = sc.textFile(arguments.ntPath) > convertRawTriple(rawTriples).repartition(5000).count > } > private def convertRawTriple(rawTriples: RDD[String]): RDD[Triple] = { > rawTriples.mapPartitions { case iter=> > iter.map { case tripleText => > val model: Model = ModelFactory.createDefaultModel > val r: StringReader = new StringReader(tripleText) > model.read(r, null, "N-TRIPLE") //scalastyle:ignore null > val stmt = model.listStatements.next() > val triple = Triple.create(stmt.getSubject.asNode, stmt.getPredicate.asNode, > stmt.getObject.asNode) > r.close > model.close > triple > } > } > } > }{code} > > These commands show that when executor-cores is 1 spark could finish the job; > but when executore-cores is 5 spark hangs regardless of how the input file is > stored (e.g., compressed by gzip, uncompressed "as-is"): > {code:java} > $ spark-submit --executor-cores 1 --num-executors 50 --driver-memory 12G > --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path > dataset_10M.nt # ok > $ spark-submit --executor-cores 5 --num-executors 50 --driver-memory 12G > --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path > dataset_10M.nt # hang > $ spark-submit --executor-cores 1 --num-executors 50 --driver-memory 12G > --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path > dataset_10M_gzip_100 # ok > $ spark-submit --executor-cores 5 --num-executors 50 --driver-memory 12G > --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path > dataset_10M_gzip_100 # hang{code} > When spark hangs, jstack shows that kryo.serializers.FieldSerializer couldn't > finish its task: > {code:java} > "Executor task launch worker-9" #164 daemon prio=5 os_prio=0 > tid=0x00007fcb2cd5a000 nid=0x3416 in Object.wait() [0x00007fcaf0f48000] > java.lang.Thread.State: RUNNABLE > at > sun.reflect.GeneratedSerializationConstructorAccessor325.newInstance(Unknown > Source) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)e > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) > at > org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171) > at > org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201) > at > org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:152) > at > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196) > at > org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81) > at > org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) > at > org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) > at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:798) > - locked <0x00000007b736da08> (a org.apache.spark.storage.BlockInfo) > at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645) > at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x00000007b83e8768> (a java.util.concurrent.ThreadPoolExecutor$Worker) > {code} > – > I also found that spark doesn't hang when I use serializer other than > kryo.serializers.FieldSerializer. > For example, this serializer is based on STriple [1]: > {code:java} > class TripleSerializer extends KryoSerializer[Triple] { > override def write(kryo: Kryo, output: Output, triple: Triple) { > val protocol: TProtocol = TRDF.protocol(output); > val tterm: RDF_Term = new RDF_Term(); > SerializerRDF.write(protocol, tterm, triple.getSubject); > SerializerRDF.write(protocol, tterm, triple.getPredicate); > SerializerRDF.write(protocol, tterm, triple.getObject); > TRDF.flush(protocol); > } > override def read(kryo: Kryo, input: Input, objClass: Class[Triple]): Triple > = { > val protocol: TProtocol = TRDF.protocol(input); > val tterm: RDF_Term = new RDF_Term(); > val s: Node = SerializerRDF.read(protocol, tterm); > val p: Node = SerializerRDF.read(protocol, tterm); > val o: Node = SerializerRDF.read(protocol, tterm); > Triple.create(s, p, o); > } > } > {code} > > and SNode [2]: > {code:java} > class NodeSerializer extends KryoSerializer[Node] { > override def write(kryo: Kryo, output: Output, obj: Node) { > output.writeString(FmtUtils.stringForNode(obj)) > } > override def read(kryo: Kryo, input: Input, objClass: Class[Node]): Node = { > val s = input.readString > RiotLib.parse(s) > } > } > {code} > > Cheers, > [0] Jena Triple (I used version 3.6.0) > > [https://jena.apache.org/documentation/javadoc/jena/org/apache/jena/graph/Triple.html] > [1] > [https://github.com/apache/jena/blob/jena-3.6.0/jena-arq/src/main/java/org/apache/jena/riot/system/STriple.java] > [2] > [https://github.com/apache/jena/blob/jena-3.6.0/jena-arq/src/main/java/org/apache/jena/riot/system/SNode.java] -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org