[ https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sidhartha updated SPARK-26558: ------------------------------ Attachment: OKVMg.png > java.util.NoSuchElementException while saving data into HDFS using Spark > ------------------------------------------------------------------------ > > Key: SPARK-26558 > URL: https://issues.apache.org/jira/browse/SPARK-26558 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Submit > Affects Versions: 2.0.0 > Reporter: Sidhartha > Priority: Major > Attachments: OKVMg.png, k5EWv.png > > > h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException while > saving data into HDFS using Spark ? > > I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader. > Below are the versions of Spark & Scala I am using: > spark-core: 2.0.0 > spark-sql: 2.0.0 > Scala version: 2.11.8 > To do that, I wrote the following code: > > {code:java} > val conf = new > SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", > "1200s") .set("spark.network.timeout", "12000s") > .set("spark.sql.inMemoryColumnarStorage.compressed", "true") > .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", > "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", > classOf[org.apache.spark.serializer.KryoSerializer].getName) > .set("spark.streaming.stopGracefullyOnShutdown", "true") > .set("spark.yarn.driver.memoryOverhead", "8192") > .set("spark.yarn.executor.memoryOverhead", "8192") > .set("spark.sql.shuffle.partitions", "400") > .set("spark.dynamicAllocation.enabled", "false") > .set("spark.shuffle.service.enabled", "true") > .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", > "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") > .set("spark.files.maxPartitionBytes", "268435468") > val flagCol = "del_flag" val spark = > SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", > "true").config("hive.exec.dynamic.partition.mode", > "nonstrict").getOrCreate() import spark.implicits._ > val dtypes = spark.read.format("jdbc").option("url", > hiveMetaConURL).option("dbtable", "(select source_type, hive_type from > hivemeta.types) as gpHiveDataTypes").option("user", > metaUserName).option("password", metaPassword).load() > val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) > .option("dbtable", "(select source_columns, precision_columns, > partition_columns from hivemeta.source_table where > tablename='gpschema.empdocs') as colsPrecision") .option("user", > metaUserName).option("password", metaPassword).load() > val dataMapper = dtypes.as[(String, String)].collect().toMap > val gpCols = spColsDF.select("source_columns").map(row => > row.getString(0)).collect.mkString(",") > val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => > s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList > val precisionCols = > spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",") > val partition_columns = > spColsDF.select("partition_columns").collect.flatMap(x => > x.getAs[String](0).split(",")) > val prtn_String_columns = > spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",") > val partCList = prtn_String_columns.split(",").map(x => col(x)) > var splitPrecisionCols = precisionCols.split(",") for (i <- > splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as > ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } > val pCols = precisionColsText.mkString(",") > val allColumns = gpColumns.concat("," + pCols) > val allColumnsSeq = allColumns.split(",").toSeq > val allColumnsSeqC = allColumnsSeq.map(x => column(x)) > val gpColSeq = gpColumns.split(",").toSeq > def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], > allColumns: String, dataMapper: Map[String, String], partition_columns: > Array[String], spark: SparkSession): DataFrame = { > val yearDF = > spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", > connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") > .option("user", devUserName).option("password", devPassword) > .option("partitionColumn","header_id") .load() .where("year=2017 and > month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) > val totalCols: List[String] = splitColumns ++ textList > val allColsOrdered = yearDF.columns.diff(partition_columns) ++ > partition_columns val allCols = allColsOrdered.map(colname => > org.apache.spark.sql.functions.col(colname)) > val resultDF = yearDF.select(allCols: _*) > val stringColumns = resultDF.schema.fields.filter(x => x.dataType == > StringType).map(s => s.name) > val finalDF = stringColumns.foldLeft(resultDF) { (tempDF, colName) => > tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), > "[\r\n]+", " "), "[\t]+", " ")) } finalDF } > val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, > partition_columns, spark) > dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/") > } }{code} > > When I submit the job, I see the tasks at below lines complete: > {code:java} > > val dataMapper = dtypes.as[(String, String)].collect().toMap > val gpCols = spColsDF.select("source_columns").map(row => > row.getString(0)).collect.mkString(",") > val precisionCols = > spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",") > val partition_columns = > spColsDF.select("partition_columns").collect.flatMap(x => > x.getAs[String](0).split(",")) > val prtn_String_columns = > spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",") > > {code} > > Once the task of saving the prepared dataframe starts, which is: > {noformat} > dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdocs/"){noformat} > job ends with the exception: \{{}} > {noformat} > java.util.NoSuchElementException{noformat} > I am submitting the job using below spark-submit command: > {code:java} > SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition > --master=yarn --conf spark.ui.port=4090 --driver-class-path > /home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --conf > spark.jars=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar > --executor-cores 3 --executor-memory 13G --keytab > /home/hdpdevusr/hdpdevusr.keytab --principal hdpdev...@usrdev.com --files > /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties > --name Splinter --conf > spark.executor.extraClassPath=/home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar > splinter_2.11-0.1.jar{code} > I see the command launches the executors as per the specified numbers in the > code which is 12 executors with 4 cores each. > Only 5 out of 48 tasks will complete and the job ends with the exception: > {code:java} > [Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task 6.0 > in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException: > None.get at scala.None$.get(Option.scala:347) at > scala.None$.get(Option.scala:345) at > io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43) at > io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:110) > at > io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:109) > at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) Caused by: > org.apache.spark.SparkException: Job 5 cancelled because killed via the Web > UI at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) > at > org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1457) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1446) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at > org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1439) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1701) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at > org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186) > ... 44 more 18/12/27 10:30:53 WARN ShutdownHookManager: ShutdownHook > '$anon$2' timeout, java.util.concurrent.TimeoutException > java.util.concurrent.TimeoutException at > java.util.concurrent.FutureTask.get(FutureTask.java:205) at > org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67) > 18/12/27 10:30:53 ERROR Utils: Uncaught exception in thread pool-6-thread-1 > java.lang.InterruptedException at java.lang.Object.wait(Native Method) at > java.lang.Thread.join(Thread.java:1249) at > java.lang.Thread.join(Thread.java:1323) at > org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:199) at > org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1919) > at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317) at > org.apache.spark.SparkContext.stop(SparkContext.scala:1918) at > org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581) > at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948) at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) > at > org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745){code} > > I don't understand where did it go wrong whether in code or in any > configuration applied in the job. > I posted the same on Stackoverflow as well. For executor images, the below > link can be referred:[ > > [https://stackoverflow.com/questions/54002002/how-to-fix-java-util-nosuchelementexception-while-saving-data-into-hdfs-using-sp/54002423?noredirect=1#comment94843141_54002423]|http://example.com] > Could anyone let me know how to fix this exception ? > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org