[jira] [Commented] (SPARK-26558) java.util.NoSuchElementException while saving data into HDFS using Spark
[ https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736662#comment-16736662 ] Hyukjin Kwon commented on SPARK-26558: -- The exception itself is simply from {{greenplum}}. Looks there's just something wrong or a bug in {{GreenplumRowIterator}}. I think this isn't an issue within a Spark. > java.util.NoSuchElementException while saving data into HDFS using Spark > > > Key: SPARK-26558 > URL: https://issues.apache.org/jira/browse/SPARK-26558 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Submit >Affects Versions: 2.0.0 >Reporter: Sidhartha >Priority: Major > Attachments: OKVMg.png, k5EWv.png > > > h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException while > saving data into HDFS using Spark ? > > I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader. > Below are the versions of Spark & Scala I am using: > spark-core: 2.0.0 > spark-sql: 2.0.0 > Scala version: 2.11.8 > To do that, I wrote the following code: > > {code:java} > val conf = new > SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", > "1200s") .set("spark.network.timeout", "12000s") > .set("spark.sql.inMemoryColumnarStorage.compressed", "true") > .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", > "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", > classOf[org.apache.spark.serializer.KryoSerializer].getName) > .set("spark.streaming.stopGracefullyOnShutdown", "true") > .set("spark.yarn.driver.memoryOverhead", "8192") > .set("spark.yarn.executor.memoryOverhead", "8192") > .set("spark.sql.shuffle.partitions", "400") > .set("spark.dynamicAllocation.enabled", "false") > .set("spark.shuffle.service.enabled", "true") > .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", > "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") > .set("spark.files.maxPartitionBytes", "268435468") > val flagCol = "del_flag" val spark = > SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", > "true").config("hive.exec.dynamic.partition.mode", > "nonstrict").getOrCreate() import spark.implicits._ > val dtypes = spark.read.format("jdbc").option("url", > hiveMetaConURL).option("dbtable", "(select source_type, hive_type from > hivemeta.types) as gpHiveDataTypes").option("user", > metaUserName).option("password", metaPassword).load() > val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) > .option("dbtable", "(select source_columns, precision_columns, > partition_columns from hivemeta.source_table where > tablename='gpschema.empdocs') as colsPrecision") .option("user", > metaUserName).option("password", metaPassword).load() > val dataMapper = dtypes.as[(String, String)].collect().toMap > val gpCols = spColsDF.select("source_columns").map(row => > row.getString(0)).collect.mkString(",") > val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => > s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList > val precisionCols = > spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",") > val partition_columns = > spColsDF.select("partition_columns").collect.flatMap(x => > x.getAs[String](0).split(",")) > val prtn_String_columns = > spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",") > val partCList = prtn_String_columns.split(",").map(x => col(x)) > var splitPrecisionCols = precisionCols.split(",") for (i <- > splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as > ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } > val pCols = precisionColsText.mkString(",") > val allColumns = gpColumns.concat("," + pCols) > val allColumnsSeq = allColumns.split(",").toSeq > val allColumnsSeqC = allColumnsSeq.map(x => column(x)) > val gpColSeq = gpColumns.split(",").toSeq > def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], > allColumns: String, dataMapper: Map[String, String], partition_columns: > Array[String], spark: SparkSession): DataFrame = { > val yearDF = > spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", > connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") > .option("user", devUserName).option("password", devPassword) > .option("partitionColumn","header_id") .load() .where("year=2017 and > month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) > val totalCols:
[jira] [Commented] (SPARK-26558) java.util.NoSuchElementException while saving data into HDFS using Spark
[ https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735619#comment-16735619 ] Sidhartha commented on SPARK-26558: --- Ok. I applied all the parameters as per the official documentation given in [https://greenplum-spark.docs.pivotal.io/160/read_from_gpdb.html|http://example.com] > java.util.NoSuchElementException while saving data into HDFS using Spark > > > Key: SPARK-26558 > URL: https://issues.apache.org/jira/browse/SPARK-26558 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Submit >Affects Versions: 2.0.0 >Reporter: Sidhartha >Priority: Major > Attachments: OKVMg.png, k5EWv.png > > > h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException while > saving data into HDFS using Spark ? > > I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader. > Below are the versions of Spark & Scala I am using: > spark-core: 2.0.0 > spark-sql: 2.0.0 > Scala version: 2.11.8 > To do that, I wrote the following code: > > {code:java} > val conf = new > SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", > "1200s") .set("spark.network.timeout", "12000s") > .set("spark.sql.inMemoryColumnarStorage.compressed", "true") > .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", > "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", > classOf[org.apache.spark.serializer.KryoSerializer].getName) > .set("spark.streaming.stopGracefullyOnShutdown", "true") > .set("spark.yarn.driver.memoryOverhead", "8192") > .set("spark.yarn.executor.memoryOverhead", "8192") > .set("spark.sql.shuffle.partitions", "400") > .set("spark.dynamicAllocation.enabled", "false") > .set("spark.shuffle.service.enabled", "true") > .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", > "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") > .set("spark.files.maxPartitionBytes", "268435468") > val flagCol = "del_flag" val spark = > SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", > "true").config("hive.exec.dynamic.partition.mode", > "nonstrict").getOrCreate() import spark.implicits._ > val dtypes = spark.read.format("jdbc").option("url", > hiveMetaConURL).option("dbtable", "(select source_type, hive_type from > hivemeta.types) as gpHiveDataTypes").option("user", > metaUserName).option("password", metaPassword).load() > val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) > .option("dbtable", "(select source_columns, precision_columns, > partition_columns from hivemeta.source_table where > tablename='gpschema.empdocs') as colsPrecision") .option("user", > metaUserName).option("password", metaPassword).load() > val dataMapper = dtypes.as[(String, String)].collect().toMap > val gpCols = spColsDF.select("source_columns").map(row => > row.getString(0)).collect.mkString(",") > val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => > s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList > val precisionCols = > spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",") > val partition_columns = > spColsDF.select("partition_columns").collect.flatMap(x => > x.getAs[String](0).split(",")) > val prtn_String_columns = > spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",") > val partCList = prtn_String_columns.split(",").map(x => col(x)) > var splitPrecisionCols = precisionCols.split(",") for (i <- > splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as > ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } > val pCols = precisionColsText.mkString(",") > val allColumns = gpColumns.concat("," + pCols) > val allColumnsSeq = allColumns.split(",").toSeq > val allColumnsSeqC = allColumnsSeq.map(x => column(x)) > val gpColSeq = gpColumns.split(",").toSeq > def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], > allColumns: String, dataMapper: Map[String, String], partition_columns: > Array[String], spark: SparkSession): DataFrame = { > val yearDF = > spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", > connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") > .option("user", devUserName).option("password", devPassword) > .option("partitionColumn","header_id") .load() .where("year=2017 and > month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) > val totalCols: List[String]
[jira] [Commented] (SPARK-26558) java.util.NoSuchElementException while saving data into HDFS using Spark
[ https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735605#comment-16735605 ] Liang-Chi Hsieh commented on SPARK-26558: - This looks like an issue at spark-greenplum connector side, not in Spark. > java.util.NoSuchElementException while saving data into HDFS using Spark > > > Key: SPARK-26558 > URL: https://issues.apache.org/jira/browse/SPARK-26558 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Submit >Affects Versions: 2.0.0 >Reporter: Sidhartha >Priority: Major > > h1. How to fix java.util.NoSuchElementException while saving data into HDFS > using Spark ? > > I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader. > Below are the versions of Spark & Scala I am using: > spark-core: 2.0.0 > spark-sql: 2.0.0 > Scala version: 2.11.8 > To do that, I wrote the following code: > > {code:java} > val conf = new > SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", > "1200s") .set("spark.network.timeout", "12000s") > .set("spark.sql.inMemoryColumnarStorage.compressed", "true") > .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", > "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", > classOf[org.apache.spark.serializer.KryoSerializer].getName) > .set("spark.streaming.stopGracefullyOnShutdown", "true") > .set("spark.yarn.driver.memoryOverhead", "8192") > .set("spark.yarn.executor.memoryOverhead", "8192") > .set("spark.sql.shuffle.partitions", "400") > .set("spark.dynamicAllocation.enabled", "false") > .set("spark.shuffle.service.enabled", "true") > .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", > "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") > .set("spark.files.maxPartitionBytes", "268435468") > val flagCol = "del_flag" val spark = > SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", > "true").config("hive.exec.dynamic.partition.mode", > "nonstrict").getOrCreate() import spark.implicits._ > val dtypes = spark.read.format("jdbc").option("url", > hiveMetaConURL).option("dbtable", "(select source_type, hive_type from > hivemeta.types) as gpHiveDataTypes").option("user", > metaUserName).option("password", metaPassword).load() > val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) > .option("dbtable", "(select source_columns, precision_columns, > partition_columns from hivemeta.source_table where > tablename='gpschema.empdocs') as colsPrecision") .option("user", > metaUserName).option("password", metaPassword).load() > val dataMapper = dtypes.as[(String, String)].collect().toMap > val gpCols = spColsDF.select("source_columns").map(row => > row.getString(0)).collect.mkString(",") > val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => > s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList > val precisionCols = > spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",") > val partition_columns = > spColsDF.select("partition_columns").collect.flatMap(x => > x.getAs[String](0).split(",")) > val prtn_String_columns = > spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",") > val partCList = prtn_String_columns.split(",").map(x => col(x)) > var splitPrecisionCols = precisionCols.split(",") for (i <- > splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as > ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } > val pCols = precisionColsText.mkString(",") > val allColumns = gpColumns.concat("," + pCols) > val allColumnsSeq = allColumns.split(",").toSeq > val allColumnsSeqC = allColumnsSeq.map(x => column(x)) > val gpColSeq = gpColumns.split(",").toSeq > def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], > allColumns: String, dataMapper: Map[String, String], partition_columns: > Array[String], spark: SparkSession): DataFrame = { > val yearDF = > spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", > connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") > .option("user", devUserName).option("password", devPassword) > .option("partitionColumn","header_id") .load() .where("year=2017 and > month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) > val totalCols: List[String] = splitColumns ++ textList > val allColsOrdered = yearDF.columns.diff(partition_columns) ++ > partition_columns val allCols =