What are you trying to do? It looks like you are mixing multiple SparkContexts together.
On Fri, Nov 4, 2016 at 5:15 PM, Lev Tsentsiper <lev.tsentsi...@numerify.com> wrote: > My code throws an exception when I am trying to create new DataSet from > within SteamWriter sink > > Simplified version of the code > > val df = sparkSession.readStream > .format("json") > .option("nullValue", " ") > .option("headerFlag", "true") > .option("spark.sql.shuffle.partitions", 1) > .option("mode", "FAILFAST") > .schema(tableSchema) > .load(s"s3n://....") > df.writeStream > //TODO Switch to S3 location > //.option("checkpointLocation", s"$input/$tenant/checkpoints/") > .option("checkpointLocation", "/tmp/checkpoins/test1") > .foreach(new ForwachWriter() { > .... > override def close() = { > val sparkSession = SparkSession.builder() > .config(new SparkConf() > .setAppName("zzz").set("spark.app.id", ""xxx) > .set("spark.master", "local[1]") > ).getOrCreate() > > val data = sparkSession.createDataset(rowList). > .createOrReplaceTempView(tempTableName) > val sql = sparkSession.sql("....") > sql.repartition(1).foreachPartition(iter=> {}) > } > > }); > > This code throws an exception > > java.util.NoSuchElementException: key not found: 202 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196) > > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421) > > at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$ > readBroadcastBlock$1.apply(TorrentBroadcast.scala:178) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) > > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174) > > at org.apache.spark.broadcast.TorrentBroadcast._value$ > lzycompute(TorrentBroadcast.scala:65) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65) > > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89) > > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec. > prepareBroadcast(BroadcastHashJoinExec.scala:101) > at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec. > codegenOuter(BroadcastHashJoinExec.scala:242) > at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec. > doConsume(BroadcastHashJoinExec.scala:83) > at org.apache.spark.sql.execution.CodegenSupport$class.consume( > WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150) > > at org.apache.spark.sql.execution.RowDataSourceScanExec. > doProduce(ExistingRDD.scala:217) > at org.apache.spark.sql.execution.CodegenSupport$$ > anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at org.apache.spark.sql.execution.CodegenSupport$$ > anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.CodegenSupport$class.produce( > WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150) > > at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec. > doProduce(BroadcastHashJoinExec.scala:77) > at org.apache.spark.sql.execution.CodegenSupport$$ > anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at org.apache.spark.sql.execution.CodegenSupport$$ > anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.CodegenSupport$class.produce( > WholeStageCodegenExec.scala:78) > at org.apache.spark.sql.execution.joins. > BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38) > at org.apache.spark.sql.execution.ProjectExec.doProduce( > basicPhysicalOperators.scala:40) > at org.apache.spark.sql.execution.CodegenSupport$$ > anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at org.apache.spark.sql.execution.CodegenSupport$$ > anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.CodegenSupport$class.produce( > WholeStageCodegenExec.scala:78) > at org.apache.spark.sql.execution.ProjectExec.produce( > basicPhysicalOperators.scala:30) > at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen( > WholeStageCodegenExec.scala:309) > at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute( > WholeStageCodegenExec.scala:347) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > > at org.apache.spark.sql.execution.exchange.ShuffleExchange. > prepareShuffleDependency(ShuffleExchange.scala:86) > at org.apache.spark.sql.execution.exchange. > ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:122) > at org.apache.spark.sql.execution.exchange. > ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) > > at org.apache.spark.sql.execution.exchange. > ShuffleExchange.doExecute(ShuffleExchange.scala:113) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > > at org.apache.spark.sql.execution.InputAdapter.inputRDDs( > WholeStageCodegenExec.scala:233) > at > org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:113) > > at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute( > WholeStageCodegenExec.scala:361) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > > at org.apache.spark.sql.execution.joins. > SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:100) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > > at org.apache.spark.sql.execution.InputAdapter.inputRDDs( > WholeStageCodegenExec.scala:233) > at org.apache.spark.sql.execution.ProjectExec.inputRDDs( > basicPhysicalOperators.scala:36) > at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute( > WholeStageCodegenExec.scala:361) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > > at > org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:88) > > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > execute$1.apply(SparkPlan.scala:115) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$ > executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at org.apache.spark.sql.execution.SparkPlan. > executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) > > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) > > at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2347) > > at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2344) > at com.numerify.platform.pipeline.TableWriter$$anonfun$ > close$5.apply(TableWriter.scala:109) > > > > This code works when run locally, but fails in cluster deployment. > Can anyone suggest better way to handle creation and processing of DataSet > within ForeachWriter? > > Thank you > >