SparkSQL on hive error

2015-10-27 Thread Anand Nalya
Hi,

I've a partitioned table in Hive (Avro) that I can query alright from hive
cli.

When using SparkSQL, I'm able to query some of the partitions, but getting
exception on some of the partitions.

The query is:

sqlContext.sql("select * from myTable where source='http' and date =
20150812").take(5).foreach(println)

The exception is:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 5, node1): java.lang.IllegalArgumentException: Error: type
expected at the position 0 of
'BIGINT:INT:INT:INT:INT:string:INT:string:string:string:string:string:string:string:string:string:string:string:string:string:string:INT:INT:string:BIGINT:string:string:BIGINT:BIGINT:string:string:string:string:string:FLOAT:FLOAT:string:string:string:string:BIGINT:BIGINT:string:string:string:string:string:string:BIGINT:string:string'
but 'BIGINT' is found.
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:348)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:331)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseType(TypeInfoUtils.java:392)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:305)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:762)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:105)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:191)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:188)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Any pointers, what might be wrong here?

Regards,
Anand


Re: Checkpoint file not found

2015-08-03 Thread Anand Nalya
Hi,

Its an application that maintains some state from the DStream using
updateStateByKey() operation. It then selects some of the records from
current batch using some criteria over current values and the state and
carries over the remaining values to next batch.

Following is the pseudo code :
var pending = emptyRDD
val dstream = kafkaStream
val stateStream = dstream.updateStateByKey(myfunc, partitioner,
initialState)
val joinedStream = dstream.transformWith(sumstream, transformer(pending) _ )
val toUpdate = joinedStream.flter(myfilter).saveToES()
val toNotUpdate = joinedStream.filter(notFilter).checkpoint(interval)

toNotUpdate.foreachRDD(rdd =
pending = rdd
)

Thanks

On 3 August 2015 at 13:09, Tathagata Das t...@databricks.com wrote:

 Can you tell us more about streaming app? DStream operation that you are
 using?

 On Sun, Aug 2, 2015 at 9:14 PM, Anand Nalya anand.na...@gmail.com wrote:

 Hi,

 I'm writing a Streaming application in Spark 1.3. After running for some
 time, I'm getting following execption. I'm sure, that no other process is
 modifying the hdfs file. Any idea, what might be the cause of this?

 15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop:
 DAGSchedulerEventProcessLoop failed; shutting down SparkContext
 java.io.FileNotFoundException: File does not exist:
 hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124)
 at
 org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)
 at
 org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
 at
 org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
 at scala.Option.map(Option.scala:145)
 at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333

Checkpoint file not found

2015-08-02 Thread Anand Nalya
Hi,

I'm writing a Streaming application in Spark 1.3. After running for some
time, I'm getting following execption. I'm sure, that no other process is
modifying the hdfs file. Any idea, what might be the cause of this?

15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop:
DAGSchedulerEventProcessLoop failed; shutting down SparkContext
java.io.FileNotFoundException: File does not exist:
hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124)
at
org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at

Re: updateStateByKey schedule time

2015-07-21 Thread Anand Nalya
I also ran into a similar use case. Is this possible?

On 15 July 2015 at 18:12, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,





 I want to implement a time-out mechanism in de updateStateByKey(…)
 routine.



 But is there a way the retrieve the time of the start of the batch
 corresponding to the call to my updateStateByKey routines?



 Suppose the streaming has build up some delay then a 
 System.currentTimeMillis()
 will not be the time of the time the batch was scheduled.



 I want to retrieve the job/task schedule time of the batch for which my 
 updateStateByKey(..)
 routine is called.



 Is this possible?



 With kind regards,

 Michel Hubert







Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-10 Thread Anand Nalya
Thanks for the help Dean/TD,

I was able to cut the lineage with checkpointing with following code:

dstream.countByValue().foreachRDD((rdd, time) = {
val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base)
val toUpdate = joined.filter(myfilter).map(mymap)
val toNotUpdate = joined.filter(mynotfilter).map(mymap)

base = base.union(toUpdate).reduceByKey(_+_, 2)
current = toNotUpdate

if(time.isMultipleOf(duration)){
  base.checkpoint()
  current.checkpoint()
}
println(toUpdate.count()) // to persistence
  })

Thanks,
Anand

On 10 July 2015 at 02:16, Tathagata Das t...@databricks.com wrote:

 Summarizing the main problems discussed by Dean

 1. If you have an infinitely growing lineage, bad things will eventually
 happen. You HAVE TO periodically (say every 10th batch), checkpoint the
 information.

 2. Unpersist the previous `current` RDD ONLY AFTER running an action on
 the `newCurrent`. Otherwise you are throwing current out of the cache
 before newCurrent has been computed. Modifying Dean's example.

 val newCurrent = rdd.union(current).reduceByKey(_+_)
 ...
 // join with newCurrent
 // collect or count or any action that uses newCurrent
 //

 // Now you can unpersist because the newCurrent has been persisted and
 wont require falling back to this cached current RDD.
 current.unpersist()


 On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 I think you're complicating the cache behavior by aggressively re-using
 vars when temporary vals would be more straightforward. For example,
 newBase = newBase.unpersist()... effectively means that newBase's data is
 not actually cached when the subsequent .union(...) is performed, so it
 probably goes back to the lineage... Same with the current.unpersist logic
 before it.

 Names are cheap, so just use local vals:

 val newCurrent = rdd.union(current).reduceByKey(_+_)
 current.unpersist()

 Also, what happens if you omit the 2 argument for the number of
 partitions in reduceByKey?

 Other minor points:

 I would change the joined, toUpdate, toNotUpdate logic to this:

 val joined = current.leftOuterJoin(newBase).map(mymap).cache()

 val toUpdate = joined.filter(myfilter).cache()
 val toNotUpdate = joined.filter(mynotfilter).cache()


 Maybe it's just for this email example, but you don't need to call
 collect on toUpdate before using foreach(println). If the RDD is huge, you
 definitely don't want to do that.

 Hope this helps.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Yes, myRDD is outside of DStream. Following is the actual code where newBase
 and current are the rdds being updated with each batch:

   val base = sc.textFile...
   var newBase = base.cache()

   val dstream: DStream[String] = ssc.textFileStream...
   var current: RDD[(String, Long)] = sc.emptyRDD.cache()

   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = {

 current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

 val joined = current.leftOuterJoin(newBase).cache()
 val toUpdate = joined.filter(myfilter).map(mymap).cache()
 val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

 toUpdate.collect().foreach(println) // this goes to some store

 newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
 2).cache()

 current = toNotUpdate.cache()

 toUpdate.unpersist()
 joined.unpersist()
 rdd.unpersist()
   })


 Regards,

 Anand


 On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote:

 Is myRDD outside a DStream? If so are you persisting on each batch
 iteration? It should be checkpointed frequently too.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 The data coming from dstream have the same keys that are in myRDD, so
 the reduceByKey after union keeps the overall tuple count in myRDD
 fixed. Or even with fixed tuple count, it will keep consuming more
 resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
The data coming from dstream have the same keys that are in myRDD, so
the reduceByKey
after union keeps the overall tuple count in myRDD fixed. Or even with
fixed tuple count, it will keep consuming more resources?

On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples coming
 from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time, the
 lineage of myRDD keeps increasing and stages in each batch of dstream keeps
 increasing, even though all the earlier stages are skipped. When the number
 of stages grow big enough, the overall delay due to scheduling delay starts
 increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand






Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Hi,

I've an application in which an rdd is being updated with tuples coming
from RDDs in a DStream with following pattern.

dstream.foreachRDD(rdd = {
  myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
})

I'm using cache() and checkpointin to cache results. Over the time, the
lineage of myRDD keeps increasing and stages in each batch of dstream keeps
increasing, even though all the earlier stages are skipped. When the number
of stages grow big enough, the overall delay due to scheduling delay starts
increasing. The processing time for each batch is still fixed.

Following figures illustrate the problem:

Job execution: https://i.imgur.com/GVHeXH3.png?1


Delays: https://i.imgur.com/1DZHydw.png?1


Is there some pattern that I can use to avoid this?

Regards,
Anand


Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Thats from the Streaming tab for Spark 1.4 WebUI.

On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples coming
 from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time, the
 lineage of myRDD keeps increasing and stages in each batch of dstream keeps
 increasing, even though all the earlier stages are skipped. When the number
 of stages grow big enough, the overall delay due to scheduling delay starts
 increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand



Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Yes, myRDD is outside of DStream. Following is the actual code where newBase
and current are the rdds being updated with each batch:

  val base = sc.textFile...
  var newBase = base.cache()

  val dstream: DStream[String] = ssc.textFileStream...
  var current: RDD[(String, Long)] = sc.emptyRDD.cache()

  dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = {

current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

val joined = current.leftOuterJoin(newBase).cache()
val toUpdate = joined.filter(myfilter).map(mymap).cache()
val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

toUpdate.collect().foreach(println) // this goes to some store

newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
2).cache()

current = toNotUpdate.cache()

toUpdate.unpersist()
joined.unpersist()
rdd.unpersist()
  })


Regards,

Anand


On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote:

 Is myRDD outside a DStream? If so are you persisting on each batch
 iteration? It should be checkpointed frequently too.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote:

 The data coming from dstream have the same keys that are in myRDD, so the 
 reduceByKey
 after union keeps the overall tuple count in myRDD fixed. Or even with
 fixed tuple count, it will keep consuming more resources?

 On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:

 If you are continuously unioning RDDs, then you are accumulating ever
 increasing data, and you are processing ever increasing amount of data in
 every batch. Obviously this is going to not last for very long. You
 fundamentally cannot keep processing ever increasing amount of data with
 finite resources, isnt it?

 On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com
 wrote:

 Thats from the Streaming tab for Spark 1.4 WebUI.

 On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:

  Hi,



 I was just wondering how you generated to second image with the charts.

 What product?



 *From:* Anand Nalya [mailto:anand.na...@gmail.com]
 *Sent:* donderdag 9 juli 2015 11:48
 *To:* spark users
 *Subject:* Breaking lineage and reducing stages in Spark Streaming



 Hi,



 I've an application in which an rdd is being updated with tuples
 coming from RDDs in a DStream with following pattern.



 dstream.foreachRDD(rdd = {

   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)

 })



 I'm using cache() and checkpointin to cache results. Over the time,
 the lineage of myRDD keeps increasing and stages in each batch of dstream
 keeps increasing, even though all the earlier stages are skipped. When the
 number of stages grow big enough, the overall delay due to scheduling 
 delay
 starts increasing. The processing time for each batch is still fixed.



 Following figures illustrate the problem:



 Job execution: https://i.imgur.com/GVHeXH3.png?1

 [image: Image removed by sender.]

 Delays: https://i.imgur.com/1DZHydw.png?1

 [image: Image removed by sender.]

 Is there some pattern that I can use to avoid this?



 Regards,

 Anand








[no subject]

2015-07-07 Thread Anand Nalya
Hi,

Suppose I have an RDD that is loaded from some file and then I also have a
DStream that has data coming from some stream. I want to keep union some of
the tuples from the DStream into my RDD. For this I can use something like
this:

  var myRDD: RDD[(String, Long)] = sc.fromText...
  dstream.foreachRDD{ rdd =
myRDD = myRDD.union(rdd.filter(myfilter))
  }

My questions is that for how long spark will keep RDDs underlying the
dstream around? Is there some configuratoin knob that can control that?

Regards,
Anand


Split RDD into two in a single pass

2015-07-06 Thread Anand Nalya
Hi,

I've a RDD which I want to split into two disjoint RDDs on with a boolean
function. I can do this with the following

val rdd1 = rdd.filter(f)
val rdd2 = rdd.filter(fnot)

I'm assuming that each of the above statement will traverse the RDD once
thus resulting in 2 passes.

Is there a way of doing this in a single pass over the RDD so that when f
returns true, the element goes to rdd1 and to rdd2 otherwise.

Regards,
Anand


Array fields in dataframe.write.jdbc

2015-07-02 Thread Anand Nalya
Hi,

I'm using spark 1.4. I've a array field in my data frame and when I'm
trying to write this dataframe to postgres, I'm getting the following
exception:

Exception in thread main java.lang.IllegalArgumentException: Can't
translate null value for field
StructField(filter,ArrayType(StringType,false),true)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply$mcI$sp(jdbc.scala:182)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:168)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:167)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.saveTable(jdbc.scala:167)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:258)
at
analytics.spark.summarizer.SparkBatchSummarizer.start(SparkBatchSummarizer.scala:155)

The schema for the dataframe is:

val schema = StructType(Seq(
StructField(ts, LongType, false),
StructField(filter, DataTypes.createArrayType(StringType, false),
true),
StructField(sort_by, StringType, true),
StructField(user_type, StringType, true),
StructField(count, LongType, false)
))

Sample dataframe contents:

+--+---+---+-+-+
|ts| filter|sort_by|user_type|count|
+--+---+---+-+-+
|1435052400|List(s)|abc| null|1|
|1435065300|List(s)|abc| null|1|
+--+---+---+-+-+

org.apache.spark.sql.jdbc.JDBCWriteDetails#saveTable has the following
definition which does not have the array type handling.

  def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties = new Properties()) {
  val dialect = JdbcDialects.get(url)
  val nullTypes: Array[Int] = df.schema.fields.map { field =
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
  field.dataType match {
case IntegerType = java.sql.Types.INTEGER
case LongType = java.sql.Types.BIGINT
case DoubleType = java.sql.Types.DOUBLE
case FloatType = java.sql.Types.REAL
case ShortType = java.sql.Types.INTEGER
case ByteType = java.sql.Types.INTEGER
case BooleanType = java.sql.Types.BIT
case StringType = java.sql.Types.CLOB
case BinaryType = java.sql.Types.BLOB
case TimestampType = java.sql.Types.TIMESTAMP
case DateType = java.sql.Types.DATE
case DecimalType.Unlimited = java.sql.Types.DECIMAL
case _ = throw new IllegalArgumentException(
  sCan't translate null value for field $field)
  })
  }

  val rddSchema = df.schema
  df.foreachPartition { iterator =
JDBCWriteDetails.savePartition(url, table, iterator, rddSchema,
nullTypes, properties)
  }
}

Is there some way of getting arrays working for now?

Thanks,
Anand