[jira] [Commented] (SPARK-26558) java.util.NoSuchElementException while saving data into HDFS using Spark

2019-01-07 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736662#comment-16736662
 ] 

Hyukjin Kwon commented on SPARK-26558:
--

The exception itself is simply from {{greenplum}}. Looks there's just something 
wrong or a bug in {{GreenplumRowIterator}}. I think this isn't an issue within 
a Spark.

> java.util.NoSuchElementException while saving data into HDFS using Spark
> 
>
> Key: SPARK-26558
> URL: https://issues.apache.org/jira/browse/SPARK-26558
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Spark Submit
>Affects Versions: 2.0.0
>Reporter: Sidhartha
>Priority: Major
> Attachments: OKVMg.png, k5EWv.png
>
>
> h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException while 
> saving data into HDFS using Spark ?
>  
> I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader.
> Below are the versions of Spark & Scala I am using:
> spark-core: 2.0.0
>  spark-sql: 2.0.0
>  Scala version: 2.11.8
> To do that, I wrote the following code:
>  
> {code:java}
> val conf = new 
> SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", 
> "1200s") .set("spark.network.timeout", "12000s") 
> .set("spark.sql.inMemoryColumnarStorage.compressed", "true") 
> .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", 
> "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer") 
> .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", 
> classOf[org.apache.spark.serializer.KryoSerializer].getName) 
> .set("spark.streaming.stopGracefullyOnShutdown", "true") 
> .set("spark.yarn.driver.memoryOverhead", "8192") 
> .set("spark.yarn.executor.memoryOverhead", "8192") 
> .set("spark.sql.shuffle.partitions", "400") 
> .set("spark.dynamicAllocation.enabled", "false") 
> .set("spark.shuffle.service.enabled", "true") 
> .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", 
> "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") 
> .set("spark.files.maxPartitionBytes", "268435468") 
> val flagCol = "del_flag" val spark = 
> SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition",
>  "true").config("hive.exec.dynamic.partition.mode", 
> "nonstrict").getOrCreate() import spark.implicits._ 
> val dtypes = spark.read.format("jdbc").option("url", 
> hiveMetaConURL).option("dbtable", "(select source_type, hive_type from 
> hivemeta.types) as gpHiveDataTypes").option("user", 
> metaUserName).option("password", metaPassword).load() 
> val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) 
> .option("dbtable", "(select source_columns, precision_columns, 
> partition_columns from hivemeta.source_table where 
> tablename='gpschema.empdocs') as colsPrecision") .option("user", 
> metaUserName).option("password", metaPassword).load() 
> val dataMapper = dtypes.as[(String, String)].collect().toMap 
> val gpCols = spColsDF.select("source_columns").map(row => 
> row.getString(0)).collect.mkString(",") 
> val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => 
> s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList 
> val precisionCols = 
> spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
>  val partition_columns = 
> spColsDF.select("partition_columns").collect.flatMap(x => 
> x.getAs[String](0).split(",")) 
> val prtn_String_columns = 
> spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
>  val partCList = prtn_String_columns.split(",").map(x => col(x)) 
> var splitPrecisionCols = precisionCols.split(",") for (i <- 
> splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as 
> ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } 
> val pCols = precisionColsText.mkString(",") 
> val allColumns = gpColumns.concat("," + pCols) 
> val allColumnsSeq = allColumns.split(",").toSeq 
> val allColumnsSeqC = allColumnsSeq.map(x => column(x)) 
> val gpColSeq = gpColumns.split(",").toSeq 
> def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], 
> allColumns: String, dataMapper: Map[String, String], partition_columns: 
> Array[String], spark: SparkSession): DataFrame = { 
> val yearDF = 
> spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url",
>  connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") 
> .option("user", devUserName).option("password", devPassword) 
> .option("partitionColumn","header_id") .load() .where("year=2017 and 
> month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) 
> val totalCols: 

[jira] [Commented] (SPARK-26558) java.util.NoSuchElementException while saving data into HDFS using Spark

2019-01-07 Thread Sidhartha (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735619#comment-16735619
 ] 

Sidhartha commented on SPARK-26558:
---

Ok. I applied all the parameters as per the official documentation given in 
[https://greenplum-spark.docs.pivotal.io/160/read_from_gpdb.html|http://example.com]

> java.util.NoSuchElementException while saving data into HDFS using Spark
> 
>
> Key: SPARK-26558
> URL: https://issues.apache.org/jira/browse/SPARK-26558
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Spark Submit
>Affects Versions: 2.0.0
>Reporter: Sidhartha
>Priority: Major
> Attachments: OKVMg.png, k5EWv.png
>
>
> h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException while 
> saving data into HDFS using Spark ?
>  
> I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader.
> Below are the versions of Spark & Scala I am using:
> spark-core: 2.0.0
>  spark-sql: 2.0.0
>  Scala version: 2.11.8
> To do that, I wrote the following code:
>  
> {code:java}
> val conf = new 
> SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", 
> "1200s") .set("spark.network.timeout", "12000s") 
> .set("spark.sql.inMemoryColumnarStorage.compressed", "true") 
> .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", 
> "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer") 
> .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", 
> classOf[org.apache.spark.serializer.KryoSerializer].getName) 
> .set("spark.streaming.stopGracefullyOnShutdown", "true") 
> .set("spark.yarn.driver.memoryOverhead", "8192") 
> .set("spark.yarn.executor.memoryOverhead", "8192") 
> .set("spark.sql.shuffle.partitions", "400") 
> .set("spark.dynamicAllocation.enabled", "false") 
> .set("spark.shuffle.service.enabled", "true") 
> .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", 
> "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") 
> .set("spark.files.maxPartitionBytes", "268435468") 
> val flagCol = "del_flag" val spark = 
> SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition",
>  "true").config("hive.exec.dynamic.partition.mode", 
> "nonstrict").getOrCreate() import spark.implicits._ 
> val dtypes = spark.read.format("jdbc").option("url", 
> hiveMetaConURL).option("dbtable", "(select source_type, hive_type from 
> hivemeta.types) as gpHiveDataTypes").option("user", 
> metaUserName).option("password", metaPassword).load() 
> val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) 
> .option("dbtable", "(select source_columns, precision_columns, 
> partition_columns from hivemeta.source_table where 
> tablename='gpschema.empdocs') as colsPrecision") .option("user", 
> metaUserName).option("password", metaPassword).load() 
> val dataMapper = dtypes.as[(String, String)].collect().toMap 
> val gpCols = spColsDF.select("source_columns").map(row => 
> row.getString(0)).collect.mkString(",") 
> val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => 
> s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList 
> val precisionCols = 
> spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
>  val partition_columns = 
> spColsDF.select("partition_columns").collect.flatMap(x => 
> x.getAs[String](0).split(",")) 
> val prtn_String_columns = 
> spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
>  val partCList = prtn_String_columns.split(",").map(x => col(x)) 
> var splitPrecisionCols = precisionCols.split(",") for (i <- 
> splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as 
> ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } 
> val pCols = precisionColsText.mkString(",") 
> val allColumns = gpColumns.concat("," + pCols) 
> val allColumnsSeq = allColumns.split(",").toSeq 
> val allColumnsSeqC = allColumnsSeq.map(x => column(x)) 
> val gpColSeq = gpColumns.split(",").toSeq 
> def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], 
> allColumns: String, dataMapper: Map[String, String], partition_columns: 
> Array[String], spark: SparkSession): DataFrame = { 
> val yearDF = 
> spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url",
>  connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") 
> .option("user", devUserName).option("password", devPassword) 
> .option("partitionColumn","header_id") .load() .where("year=2017 and 
> month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) 
> val totalCols: List[String] 

[jira] [Commented] (SPARK-26558) java.util.NoSuchElementException while saving data into HDFS using Spark

2019-01-07 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735605#comment-16735605
 ] 

Liang-Chi Hsieh commented on SPARK-26558:
-

This looks like an issue at spark-greenplum connector side, not in Spark.

> java.util.NoSuchElementException while saving data into HDFS using Spark
> 
>
> Key: SPARK-26558
> URL: https://issues.apache.org/jira/browse/SPARK-26558
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Spark Submit
>Affects Versions: 2.0.0
>Reporter: Sidhartha
>Priority: Major
>
> h1. How to fix java.util.NoSuchElementException while saving data into HDFS 
> using Spark ?
>  
> I'm trying to ingest a greenplum table into HDFS using spark-greenplum reader.
> Below are the versions of Spark & Scala I am using:
> spark-core: 2.0.0
>  spark-sql: 2.0.0
>  Scala version: 2.11.8
> To do that, I wrote the following code:
>  
> {code:java}
> val conf = new 
> SparkConf().setAppName("TEST_YEAR").set("spark.executor.heartbeatInterval", 
> "1200s") .set("spark.network.timeout", "12000s") 
> .set("spark.sql.inMemoryColumnarStorage.compressed", "true") 
> .set("spark.shuffle.compress", "true") .set("spark.shuffle.spill.compress", 
> "true") .set("spark.sql.orc.filterPushdown", "true") .set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer") 
> .set("spark.kryoserializer.buffer.max", "512m") .set("spark.serializer", 
> classOf[org.apache.spark.serializer.KryoSerializer].getName) 
> .set("spark.streaming.stopGracefullyOnShutdown", "true") 
> .set("spark.yarn.driver.memoryOverhead", "8192") 
> .set("spark.yarn.executor.memoryOverhead", "8192") 
> .set("spark.sql.shuffle.partitions", "400") 
> .set("spark.dynamicAllocation.enabled", "false") 
> .set("spark.shuffle.service.enabled", "true") 
> .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.instances", 
> "12") .set("spark.executor.memory", "13g") .set("spark.executor.cores", "4") 
> .set("spark.files.maxPartitionBytes", "268435468") 
> val flagCol = "del_flag" val spark = 
> SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition",
>  "true").config("hive.exec.dynamic.partition.mode", 
> "nonstrict").getOrCreate() import spark.implicits._ 
> val dtypes = spark.read.format("jdbc").option("url", 
> hiveMetaConURL).option("dbtable", "(select source_type, hive_type from 
> hivemeta.types) as gpHiveDataTypes").option("user", 
> metaUserName).option("password", metaPassword).load() 
> val spColsDF = spark.read.format("jdbc").option("url", hiveMetaConURL) 
> .option("dbtable", "(select source_columns, precision_columns, 
> partition_columns from hivemeta.source_table where 
> tablename='gpschema.empdocs') as colsPrecision") .option("user", 
> metaUserName).option("password", metaPassword).load() 
> val dataMapper = dtypes.as[(String, String)].collect().toMap 
> val gpCols = spColsDF.select("source_columns").map(row => 
> row.getString(0)).collect.mkString(",") 
> val gpColumns = gpCols.split("\\|").map(e => e.split("\\:")).map(s => 
> s(0)).mkString(",") val splitColumns = gpCols.split("\\|").toList 
> val precisionCols = 
> spColsDF.select("precision_columns").collect().map(_.getString(0)).mkString(",")
>  val partition_columns = 
> spColsDF.select("partition_columns").collect.flatMap(x => 
> x.getAs[String](0).split(",")) 
> val prtn_String_columns = 
> spColsDF.select("partition_columns").collect().map(_.getString(0)).mkString(",")
>  val partCList = prtn_String_columns.split(",").map(x => col(x)) 
> var splitPrecisionCols = precisionCols.split(",") for (i <- 
> splitPrecisionCols) { precisionColsText += i.concat(s"::${textType} as 
> ").concat(s"${i}_text") textList += s"${i}_text:${textType}" } 
> val pCols = precisionColsText.mkString(",") 
> val allColumns = gpColumns.concat("," + pCols) 
> val allColumnsSeq = allColumns.split(",").toSeq 
> val allColumnsSeqC = allColumnsSeq.map(x => column(x)) 
> val gpColSeq = gpColumns.split(",").toSeq 
> def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[String], 
> allColumns: String, dataMapper: Map[String, String], partition_columns: 
> Array[String], spark: SparkSession): DataFrame = { 
> val yearDF = 
> spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url",
>  connectionUrl) .option("dbtable", "empdocs") .option("dbschema","gpschema") 
> .option("user", devUserName).option("password", devPassword) 
> .option("partitionColumn","header_id") .load() .where("year=2017 and 
> month=12") .select(gpColSeq map col:_*) .withColumn(flagCol, lit(0)) 
> val totalCols: List[String] = splitColumns ++ textList 
> val allColsOrdered = yearDF.columns.diff(partition_columns) ++ 
> partition_columns val allCols =