SparkSQL on hive error
Hi, I've a partitioned table in Hive (Avro) that I can query alright from hive cli. When using SparkSQL, I'm able to query some of the partitions, but getting exception on some of the partitions. The query is: sqlContext.sql("select * from myTable where source='http' and date = 20150812").take(5).foreach(println) The exception is: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, node1): java.lang.IllegalArgumentException: Error: type expected at the position 0 of 'BIGINT:INT:INT:INT:INT:string:INT:string:string:string:string:string:string:string:string:string:string:string:string:string:string:INT:INT:string:BIGINT:string:string:BIGINT:BIGINT:string:string:string:string:string:FLOAT:FLOAT:string:string:string:string:BIGINT:BIGINT:string:string:string:string:string:string:BIGINT:string:string' but 'BIGINT' is found. at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:348) at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:331) at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseType(TypeInfoUtils.java:392) at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:305) at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:762) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:105) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:191) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:188) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Any pointers, what might be wrong here? Regards, Anand
Re: Checkpoint file not found
Hi, Its an application that maintains some state from the DStream using updateStateByKey() operation. It then selects some of the records from current batch using some criteria over current values and the state and carries over the remaining values to next batch. Following is the pseudo code : var pending = emptyRDD val dstream = kafkaStream val stateStream = dstream.updateStateByKey(myfunc, partitioner, initialState) val joinedStream = dstream.transformWith(sumstream, transformer(pending) _ ) val toUpdate = joinedStream.flter(myfilter).saveToES() val toNotUpdate = joinedStream.filter(notFilter).checkpoint(interval) toNotUpdate.foreachRDD(rdd = pending = rdd ) Thanks On 3 August 2015 at 13:09, Tathagata Das t...@databricks.com wrote: Can you tell us more about streaming app? DStream operation that you are using? On Sun, Aug 2, 2015 at 9:14 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, I'm writing a Streaming application in Spark 1.3. After running for some time, I'm getting following execption. I'm sure, that no other process is modifying the hdfs file. Any idea, what might be the cause of this? 15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext java.io.FileNotFoundException: File does not exist: hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124) at org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333
Checkpoint file not found
Hi, I'm writing a Streaming application in Spark 1.3. After running for some time, I'm getting following execption. I'm sure, that no other process is modifying the hdfs file. Any idea, what might be the cause of this? 15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext java.io.FileNotFoundException: File does not exist: hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124) at org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at
Re: updateStateByKey schedule time
I also ran into a similar use case. Is this possible? On 15 July 2015 at 18:12, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I want to implement a time-out mechanism in de updateStateByKey(…) routine. But is there a way the retrieve the time of the start of the batch corresponding to the call to my updateStateByKey routines? Suppose the streaming has build up some delay then a System.currentTimeMillis() will not be the time of the time the batch was scheduled. I want to retrieve the job/task schedule time of the batch for which my updateStateByKey(..) routine is called. Is this possible? With kind regards, Michel Hubert
Re: Breaking lineage and reducing stages in Spark Streaming
Thanks for the help Dean/TD, I was able to cut the lineage with checkpointing with following code: dstream.countByValue().foreachRDD((rdd, time) = { val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base) val toUpdate = joined.filter(myfilter).map(mymap) val toNotUpdate = joined.filter(mynotfilter).map(mymap) base = base.union(toUpdate).reduceByKey(_+_, 2) current = toNotUpdate if(time.isMultipleOf(duration)){ base.checkpoint() current.checkpoint() } println(toUpdate.count()) // to persistence }) Thanks, Anand On 10 July 2015 at 02:16, Tathagata Das t...@databricks.com wrote: Summarizing the main problems discussed by Dean 1. If you have an infinitely growing lineage, bad things will eventually happen. You HAVE TO periodically (say every 10th batch), checkpoint the information. 2. Unpersist the previous `current` RDD ONLY AFTER running an action on the `newCurrent`. Otherwise you are throwing current out of the cache before newCurrent has been computed. Modifying Dean's example. val newCurrent = rdd.union(current).reduceByKey(_+_) ... // join with newCurrent // collect or count or any action that uses newCurrent // // Now you can unpersist because the newCurrent has been persisted and wont require falling back to this cached current RDD. current.unpersist() On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler deanwamp...@gmail.com wrote: I think you're complicating the cache behavior by aggressively re-using vars when temporary vals would be more straightforward. For example, newBase = newBase.unpersist()... effectively means that newBase's data is not actually cached when the subsequent .union(...) is performed, so it probably goes back to the lineage... Same with the current.unpersist logic before it. Names are cheap, so just use local vals: val newCurrent = rdd.union(current).reduceByKey(_+_) current.unpersist() Also, what happens if you omit the 2 argument for the number of partitions in reduceByKey? Other minor points: I would change the joined, toUpdate, toNotUpdate logic to this: val joined = current.leftOuterJoin(newBase).map(mymap).cache() val toUpdate = joined.filter(myfilter).cache() val toNotUpdate = joined.filter(mynotfilter).cache() Maybe it's just for this email example, but you don't need to call collect on toUpdate before using foreach(println). If the RDD is huge, you definitely don't want to do that. Hope this helps. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote: Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote: Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel
Re: Breaking lineage and reducing stages in Spark Streaming
The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Breaking lineage and reducing stages in Spark Streaming
Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 Delays: https://i.imgur.com/1DZHydw.png?1 Is there some pattern that I can use to avoid this? Regards, Anand
Re: Breaking lineage and reducing stages in Spark Streaming
Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Re: Breaking lineage and reducing stages in Spark Streaming
Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote: Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
[no subject]
Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that? Regards, Anand
Split RDD into two in a single pass
Hi, I've a RDD which I want to split into two disjoint RDDs on with a boolean function. I can do this with the following val rdd1 = rdd.filter(f) val rdd2 = rdd.filter(fnot) I'm assuming that each of the above statement will traverse the RDD once thus resulting in 2 passes. Is there a way of doing this in a single pass over the RDD so that when f returns true, the element goes to rdd1 and to rdd2 otherwise. Regards, Anand
Array fields in dataframe.write.jdbc
Hi, I'm using spark 1.4. I've a array field in my data frame and when I'm trying to write this dataframe to postgres, I'm getting the following exception: Exception in thread main java.lang.IllegalArgumentException: Can't translate null value for field StructField(filter,ArrayType(StringType,false),true) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply$mcI$sp(jdbc.scala:182) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:168) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:167) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.saveTable(jdbc.scala:167) at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:258) at analytics.spark.summarizer.SparkBatchSummarizer.start(SparkBatchSummarizer.scala:155) The schema for the dataframe is: val schema = StructType(Seq( StructField(ts, LongType, false), StructField(filter, DataTypes.createArrayType(StringType, false), true), StructField(sort_by, StringType, true), StructField(user_type, StringType, true), StructField(count, LongType, false) )) Sample dataframe contents: +--+---+---+-+-+ |ts| filter|sort_by|user_type|count| +--+---+---+-+-+ |1435052400|List(s)|abc| null|1| |1435065300|List(s)|abc| null|1| +--+---+---+-+-+ org.apache.spark.sql.jdbc.JDBCWriteDetails#saveTable has the following definition which does not have the array type handling. def saveTable( df: DataFrame, url: String, table: String, properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field = dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse( field.dataType match { case IntegerType = java.sql.Types.INTEGER case LongType = java.sql.Types.BIGINT case DoubleType = java.sql.Types.DOUBLE case FloatType = java.sql.Types.REAL case ShortType = java.sql.Types.INTEGER case ByteType = java.sql.Types.INTEGER case BooleanType = java.sql.Types.BIT case StringType = java.sql.Types.CLOB case BinaryType = java.sql.Types.BLOB case TimestampType = java.sql.Types.TIMESTAMP case DateType = java.sql.Types.DATE case DecimalType.Unlimited = java.sql.Types.DECIMAL case _ = throw new IllegalArgumentException( sCan't translate null value for field $field) }) } val rddSchema = df.schema df.foreachPartition { iterator = JDBCWriteDetails.savePartition(url, table, iterator, rddSchema, nullTypes, properties) } } Is there some way of getting arrays working for now? Thanks, Anand