My code throws an exception when I am trying to create new DataSet from
within SteamWriter sink 

Simplified version of the code 

  val df = sparkSession.readStream 
    .format("json") 
    .option("nullValue", " ") 
    .option("headerFlag", "true") 
    .option("spark.sql.shuffle.partitions", 1) 
    .option("mode", "FAILFAST") 
    .schema(tableSchema) 
    .load(s"s3n://....") 
df.writeStream 
    //TODO Switch to S3 location 
    //.option("checkpointLocation", s"$input/$tenant/checkpoints/") 
    .option("checkpointLocation", "/tmp/checkpoins/test1") 
    .foreach(new ForwachWriter() { 
               .... 
     override def close() = { 
        val sparkSession = SparkSession.builder() 
          .config(new SparkConf() 
            .setAppName("zzz").set("spark.app.id", ""xxx) 
            .set("spark.master", "local[1]") 
          ).getOrCreate() 

            val data = sparkSession.createDataset(rowList). 
            .createOrReplaceTempView(tempTableName) 
             val sql =   sparkSession.sql("....") 
            sql.repartition(1).foreachPartition(iter=> {}) 
     } 

}); 
  
This code throws an exception 

java.util.NoSuchElementException: key not found: 202 
        at scala.collection.MapLike$class.default(MapLike.scala:228) 
        at scala.collection.AbstractMap.default(Map.scala:59) 
        at scala.collection.mutable.HashMap.apply(HashMap.scala:65) 
        at
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
 
        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421) 
        at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
 
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) 
        at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
 
        at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
 
        at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65) 
        at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89) 
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:101)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:217)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
 
        at
org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
 
        at
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:309)
 
        at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:347)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
        at
org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86)
 
        at
org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:122)
 
        at
org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113)
 
        at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) 
        at
org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
        at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
 
        at
org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:113) 
        at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
        at
org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:100)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
        at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
 
        at
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:36)
 
        at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
        at
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:88)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
        at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
 
        at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) 
        at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2347) 
        at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2344) 
        at
com.numerify.platform.pipeline.TableWriter$$anonfun$close$5.apply(TableWriter.scala:109)
 



This code works when run locally, but fails in cluster deployment. 
Can anyone suggest better way to handle creation and processing of DataSet
within ForeachWriter? 

Thanks you 
  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-when-trying-to-use-dataset-from-worker-tp28023.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to