[Structured Streaming] Connecting to Kafka via a Custom Consumer / Producer

2020-04-22 Thread Patrick McGloin
Hi,

The large international bank I work for has a custom Kafka implementation.
The client libraries that are used to connect to Kafka have extra security
steps.  They implement the Kafka Consumer and Producer interfaces in this
client library so once we use it to connect to Kafka, we can treat our
connections as the standard Kafka interfaces in our code.

We can't use the out-of-the-box Kafka connecter from Structured Streaming
as only a KafkaConsumer can be used.

Would it be possible / advisable / a good idea to change this to use the
Consumer interface and allow users to specify a callback somehow to create
their own connection to Kafka?

So the signature of this private method in InternalKafkaConsumer would
change to use the Consumer interface (as would the rest of the code base)
and somehow users are given the option to create their own Consumer if they
wanted.  The same would apply for Producers.

/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer(): KafkaConsumer[Array[Byte], Array[Byte]] = {
  ...
  val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParamsWithSecurity)
  *...*
}

At the moment we are left with two options, copy the Spark code base and
swap in our custom Consumer for the KafkaConsumer used in that function
(and a few other changes).  This leaves us with a codebase to maintain that
will be out of sync over time.  Or we can build and maintain our own custom
connecter.

Bet regards,
Patrick


Re: Spark Structured Streaming resource contention / memory issue

2018-10-15 Thread Patrick McGloin
Hi Jungtaek,

Thanks, we thought that might be the issue but haven't tested yet as
building against an unreleased version of Spark is tough for us, due to
network restrictions. We will try though. I will report back if we find
anything.

Best regards,
Patrick

On Fri, Oct 12, 2018, 2:57 PM Jungtaek Lim  wrote:

> Hi Patrick,
>
> Looks like you might be struggling with state memory, which multiple
> issues are going to be resolved in Spark 2.4.
>
> 1. SPARK-24441 [1]: Expose total estimated size of states in
> HDFSBackedStateStoreProvider
> 2. SPARK-24637 [2]: Add metrics regarding state and watermark to
> dropwizard metrics
> 3. SPARK-24717 [3]: Split out min retain version of state for memory in
> HDFSBackedStateStoreProvider
>
> There're other patches relevant to state store as well, but above issues
> are applied to map/flatmapGroupsWithState.
>
> Since Spark community is in progress on releasing Spark 2.4.0, could you
> try experimenting Spark 2.4.0 RC if you really don't mind? You could try
> out applying individual patches and see whether it helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://issues.apache.org/jira/browse/SPARK-24441
> 2. https://issues.apache.org/jira/browse/SPARK-24637
> 3. https://issues.apache.org/jira/browse/SPARK-24717
>
>
> 2018년 10월 12일 (금) 오후 9:31, Patrick McGloin 님이
> 작성:
>
>> Hi allI sent this earlier but the screenshots were not attached.
>> Hopefully this time it is correct.
>>
>> We have a Spark Structured streaming stream which is using
>> mapGroupWithState. After some time of processing in a stable manner
>> suddenly each mini batch starts taking 40 seconds. Suspiciously it looks
>> like exactly 40 seconds each time. Before this the batches were taking less
>> than a second.
>>
>>
>> Looking at the details for a particular task most partitions are
>> processed really quickly but a few take exactly 40 seconds:
>>
>>
>>
>>
>> The GC was looking ok as the data was being processed quickly but
>> suddenly the full GCs etc stop (at the same time as the 40 second issue):
>>
>>
>>
>> I have taken a thread dump from one of the executors as this issue is
>> happening but I cannot see any resource they are blocked on:
>>
>>
>>
>>
>> Are we hitting a GC problem and why is it manifesting in this way? Is
>> there another resource that is blocking and what is it?
>>
>>
>> Thanks,
>> Patrick
>>
>>
>>
>> This message has been sent by ABN AMRO Bank N.V., which has its seat at 
>> Gustav
>> Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands
>> <https://maps.google.com/?q=Gustav+Mahlerlaan+10+(1082+PP)+Amsterdam,+the+Netherlands=gmail=g>,
>> and is registered in the Commercial Register of Amsterdam under number
>> 34334259.
>>
>


Spark Structured Streaming resource contention / memory issue

2018-10-12 Thread Patrick McGloin
Hi all,

We have a Spark Structured streaming stream which is using
mapGroupWithState. After some time of processing in a stable manner
suddenly each mini batch starts taking 40 seconds. Suspiciously it looks
like exactly 40 seconds each time. Before this the batches were taking less
than a second.


Looking at the details for a particular task most partitions are processed
really quickly but a few take exactly 40 seconds:




The GC was looking ok as the data was being processed quickly but suddenly
the full GCs etc stop (at the same time as the 40 second issue):



I have taken a thread dump from one of the executors as this issue is
happening but I cannot see any resource they are blocked on:




Are we hitting a GC problem and why is it manifesting in this way? Is there
another resource that is blocking and what is it?


Thanks,
Patrick


Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Patrick McGloin
You could use an object in Scala, of which only one instance will be
created on each JVM / Executor. E.g.

object MyDatabseSingleton {
var dbConn = ???
}

On Sat, 28 Jul 2018, 08:34 kant kodali,  wrote:

> Hi All,
>
> I understand creating a connection forEachPartition but I am wondering can
> I create one DB connection per executor and close it after the job is done?
> any sample code would help. you can imagine I am running a simple batch
> processing application.
>
> Thanks!
>


Re: How to handle java.sql.Date inside Maps with to_json / from_json

2018-06-28 Thread Patrick McGloin
Hi all,

I tested this with a Date outside a map and it works fine so I think the
issue is simply for Dates inside Maps. I will create a Jira for this unless
there are objections.

Best regards,
Patrick

On Thu, 28 Jun 2018, 11:53 Patrick McGloin, 
wrote:

> Consider the following test, which will fail on the final show:
>
> * case class *UnitTestCaseClassWithDateInsideMap(map: Map[Date, Int])
>
> test(*"Test a Date as key in a Map"*) {
>  *val *map =  *UnitTestCaseClassWithDateInsideMap*(*Map*(Date.*valueOf*(
> *"2018-06-28"*) -> 1))
>  *val *options = *Map*(*"timestampFormat" *-> *"/MM/dd HH:mm:ss.SSS"*,
> *"dateFormat" *-> *"/MM/dd"*)
>  *val *schema = Encoders.*product*
> [UnitTestCaseClassWithDateInsideMap].schema
>
>  *val *mapDF = *Seq*(map).toDF()
>  *val *jsonDF = mapDF.select(*to_json*(*struct*(mapDF.columns.head,
> mapDF.columns.tail:_*), options))
>  jsonDF.show()
>
>  *val *jsonString = jsonDF.map(_.getString(0)).collect().head
>
>  *val *stringDF = *Seq*(jsonString).toDF(*"json"*)
>  *val *parsedDF = stringDF.select(*from_json*(*$"json"*, schema, options))
>  parsedDF.show()
> }
>
>
> The result of the line "jsonDF.show()" is as follows:
>
> +---+
> |structstojson(named_struct(NamePlaceholder(), map))|
> +---+
> |{"map":{"17710":1}}|
> +---+
>
> As can be seen the date is not formatted correctly.  The error with
>  "parsedDF.show()" is:
>
> java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String
> cannot be cast to java.lang.Integer
>
> I have tried adding the options to to_json / from_json but it hasn't
> helped.  Am I using the wrong options?
>
> Is there another way to do this?
>
> Best regards,
> Patrick
> This message has been sent by ABN AMRO Bank N.V., which has its seat at Gustav
> Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands
> <https://maps.google.com/?q=Gustav+Mahlerlaan+10+(1082+PP)+Amsterdam,+the+Netherlands=gmail=g>,
> and is registered in the Commercial Register of Amsterdam under number
> 34334259.
>


How to handle java.sql.Date inside Maps with to_json / from_json

2018-06-28 Thread Patrick McGloin
Consider the following test, which will fail on the final show:

* case class *UnitTestCaseClassWithDateInsideMap(map: Map[Date, Int])

test(*"Test a Date as key in a Map"*) {
 *val *map =  *UnitTestCaseClassWithDateInsideMap*(*Map*(Date.*valueOf*(
*"2018-06-28"*) -> 1))
 *val *options = *Map*(*"timestampFormat" *-> *"/MM/dd
HH:mm:ss.SSS"*, *"dateFormat"
*-> *"/MM/dd"*)
 *val *schema = Encoders.*product*
[UnitTestCaseClassWithDateInsideMap].schema

 *val *mapDF = *Seq*(map).toDF()
 *val *jsonDF = mapDF.select(*to_json*(*struct*(mapDF.columns.head,
mapDF.columns.tail:_*), options))
 jsonDF.show()

 *val *jsonString = jsonDF.map(_.getString(0)).collect().head

 *val *stringDF = *Seq*(jsonString).toDF(*"json"*)
 *val *parsedDF = stringDF.select(*from_json*(*$"json"*, schema, options))
 parsedDF.show()
}


The result of the line "jsonDF.show()" is as follows:

+---+
|structstojson(named_struct(NamePlaceholder(), map))|
+---+
|{"map":{"17710":1}}|
+---+

As can be seen the date is not formatted correctly.  The error with
 "parsedDF.show()" is:

java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String
cannot be cast to java.lang.Integer

I have tried adding the options to to_json / from_json but it hasn't
helped.  Am I using the wrong options?

Is there another way to do this?

Best regards,
Patrick
This message has been sent by ABN AMRO Bank N.V., which has its seat at Gustav
Mahlerlaan 10 (1082 PP) Amsterdam, the Netherlands
,
and is registered in the Commercial Register of Amsterdam under number
34334259.


Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread Patrick McGloin
# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df \
  .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .option("checkpointLocation", "/path/to/HDFS/dir") \
  .start()

Described here:

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html



On 19 May 2017 at 10:45,  wrote:

> Is there a Kafka sink for Spark Structured Streaming ?
>
> Sent from my iPhone
>


Re: Structured Streaming + initialState

2017-05-06 Thread Patrick McGloin
The initial state is stored in a Parquet file which is effectively a static
Dataset.  I seen there is a Jira open for full joins on streaming plus
static Datasets for Structured Streaming (SPARK-20002
<https://issues.apache.org/jira/browse/SPARK-20002>).  So once that Jira is
completed it would be possible.

For mapGroupsWithState it would be great if you could provide an
initialState Dataset with Key -> State initial values.

On 5 May 2017 at 23:49, Tathagata Das <tathagata.das1...@gmail.com> wrote:

> Can you explain how your initial state is stored? is it a file, or its in
> a database?
> If its in a database, then when initialize the GroupState, you can fetch
> it from the database.
>
> On Fri, May 5, 2017 at 7:35 AM, Patrick McGloin <mcgloin.patr...@gmail.com
> > wrote:
>
>> Hi all,
>>
>> With Spark Structured Streaming, is there a possibility to set an
>> "initial state" for a query?
>>
>> Using a join between a streaming Dataset and a static Dataset does not
>> support full joins.
>>
>> Using mapGroupsWithState to create a GroupState does not support an
>> initialState (as the Spark Streaming StateSpec did).
>>
>> Are there any plans to add support for initial states?  Or is there
>> already a way to do so?
>>
>> Best regards,
>> Patrick
>>
>
>


Structured Streaming + initialState

2017-05-05 Thread Patrick McGloin
Hi all,

With Spark Structured Streaming, is there a possibility to set an "initial
state" for a query?

Using a join between a streaming Dataset and a static Dataset does not
support full joins.

Using mapGroupsWithState to create a GroupState does not support an
initialState (as the Spark Streaming StateSpec did).

Are there any plans to add support for initial states?  Or is there already
a way to do so?

Best regards,
Patrick


Streaming WriteAheadLogBasedBlockHandler disallows parellism via StorageLevel replication factor

2016-04-13 Thread Patrick McGloin
Hi all,

If I am using a Custom Receiver with Storage Level set to StorageLevel.
MEMORY_ONLY_SER_2 and the WAL enabled I get this Warning in the logs:

16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: Storage level
replication 2 is unnecessary when write ahead log is enabled, change
to replication 1
16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: User defined
storage level StorageLevel(false, true, false, false, 2) is changed to
effective storage level StorageLevel(false, true, false, false, 1)
when write ahead log is enabled


My application is running on 4 Executors with 4 cores each, and 1
Receiver.  Because the data is not replicated the processing runs on only
one Executor:

[image: Inline images 1]

Instead of 16 cores processing the Streaming data only 4 are being used.

We cannot reparation the DStream to distribute data to more Executors since
if you call reparation on an RDD which is only located on one node, the new
partitions are only created on that node, which doesn't help.  This theory
that repartitioning doesn't help can be tested with this simple example,
which tries to go from one partition on a single node to many on many
nodes.  What you find with when you look at the multiplePartitions RDD in
the UI is that its 6 partitions are on the same Executor.

scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 4).cache.setName("rdd")
rdd: org.apache.spark.rdd.RDD[Int] = rdd ParallelCollectionRDD[0] at
parallelize at :27

scala> rdd.count()
res0: Long = 6

scala> val singlePartition = rdd.repartition(1).cache.setName("singlePartition")
singlePartition: org.apache.spark.rdd.RDD[Int] = singlePartition
MapPartitionsRDD[4] at repartition at :29

scala> singlePartition.count()
res1: Long = 6

scala> val multiplePartitions =
singlePartition.repartition(6).cache.setName("multiplePartitions")
multiplePartitions: org.apache.spark.rdd.RDD[Int] = multiplePartitions
MapPartitionsRDD[8] at repartition at :31

scala> multiplePartitions.count()
res2: Long = 6

Am I correct in the use of reparation, that the data does not get
shuffled if it is all on one Executor?

Shouldn't I be allowed to set the Receiver replication factor to two
when the WAL is enabled so that multiple Executors can work on the
Streaming input data?

We will look into creating 4 Receivers so that the data gets distributed
more evenly.  But won't that "waste" 4 cores in our example, where one
would do?

Best regards,
Patrick


Understanding Spark Task failures

2016-01-28 Thread Patrick McGloin
I am trying to understand what will happen when Spark has an exception
during processing, especially while streaming.

If I have a small code spinet like this:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  throw new Exception("User exception...")
}

If I run this I will get output like this:

[info] processed => [List(Item1)]
[error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job
streaming job 1453999278000 ms.0
[error] java.lang.Exception: User exception...
...
[info] processed => [List(Item2)]
[error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job
streaming job 1453999279000 ms.0
[error] java.lang.Exception: User exception...

First "Item1" is processed, and it fails (of course). In the next batch
"Item2" is processed. The record "Item1" has now been lost.

If I change my code so that the exception occurs inside a task:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  rdd.map{case x => throw new Exception("User exception...") }.collect()
}

Then the map closure will be retried, but once it has failed enough times
the record is discarded and processing continues to the next record.

Is it possible to ensure that records are not discarded, even if this means
stopping the application? I have the WAL enabled.


Re: Understanding Spark Task failures

2016-01-28 Thread Patrick McGloin
Hi Tathagata,

Thanks for the response.  I can add in a try catch myself and handle user
exceptions, that's true, so maybe my example wasn't a very good one.  I'm
more worried about OOM exceptions and other run-time exceptions (that could
happen outside my try catch).

For example, I have this periodic "java.io.IOException: Class not found"
exception at the moment:

https://forums.databricks.com/questions/6601/javaioioexception-class-not-found-on-long-running.html

After this happens I lose data even though I have the WAL setup.  With the
WAL I can ensure that the data is safely stored when it has come into the
system from an external source, and I only ACK the external source after it
has been stored.  But it seems that there is no guarantee that the data is
successfully processed?

I assume I am right in what I am saying about losing data with the WAL
setup correctly.  The WAL works when stopping and starting the application,
etc.  But something is not handling the run time exception well.  This was
the start of my investigation into what is going wrong, so of course there
could be another reason for what I'm seeing.



On 28 January 2016 at 21:43, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> That is hard to guarantee by the system, and it is upto the app developer
> to ensure that this is not . For example, if the data in a message is
> corrupted, unless the app code is robust towards handling such data, the
> system will fail every time it retries that app code.
>
> On Thu, Jan 28, 2016 at 8:51 AM, Patrick McGloin <
> mcgloin.patr...@gmail.com> wrote:
>
>> I am trying to understand what will happen when Spark has an exception
>> during processing, especially while streaming.
>>
>> If I have a small code spinet like this:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   throw new Exception("User exception...")
>> }
>>
>> If I run this I will get output like this:
>>
>> [info] processed => [List(Item1)]
>> [error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job 
>> streaming job 1453999278000 ms.0
>> [error] java.lang.Exception: User exception...
>> ...
>> [info] processed => [List(Item2)]
>> [error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job 
>> streaming job 1453999279000 ms.0
>> [error] java.lang.Exception: User exception...
>>
>> First "Item1" is processed, and it fails (of course). In the next batch
>> "Item2" is processed. The record "Item1" has now been lost.
>>
>> If I change my code so that the exception occurs inside a task:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   rdd.map{case x => throw new Exception("User exception...") }.collect()
>> }
>>
>> Then the map closure will be retried, but once it has failed enough times
>> the record is discarded and processing continues to the next record.
>>
>> Is it possible to ensure that records are not discarded, even if this
>> means stopping the application? I have the WAL enabled.
>>
>
>


“java.io.IOException: Class not found” on long running Streaming application

2016-01-28 Thread Patrick McGloin
I am getting the exception below on a long running Spark Streaming
application. The exception could occur after a few minutes, but it may also
may not happen for days. This is with pretty consistent input data.

I have seen this Jira ticket
 (
https://issues.apache.org/jira/browse/SPARK-6152) but I don't think it is
the same issue. That is java.lang.IllegalArgumentException and this is
java.io.IOException:
Class not found.

My application is streaming data and writing to Parquet using Spark SQL.

I am using Spark 1.5.2. Any ideas?

28-01-2016 09:36:00 ERROR JobScheduler:96 - Error generating jobs for
time 145397376 ms
java.io.IOException: Class not found
at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown
Source)
at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
Source)
at 
org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)
at 
org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.map(RDD.scala:317)
at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at 
org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStrea


Re: Spark Streaming Write Ahead Log (WAL) not replaying data after restart

2016-01-26 Thread Patrick McGloin
Thank you Shixiong, that is what I was missing.

On 26 January 2016 at 00:27, Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> You need to define a create function and use StreamingContext.getOrCreate.
> See the example here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing
>
> On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin <
> mcgloin.patr...@gmail.com> wrote:
>
>> Hi all,
>>
>> To have a simple way of testing the Spark Streaming Write Ahead Log I
>> created a very simple Custom Input Receiver, which will generate strings
>> and store those:
>>
>> class InMemoryStringReceiver extends 
>> Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>>
>>   val batchID = System.currentTimeMillis()
>>
>>   def onStart() {
>> new Thread("InMemoryStringReceiver") {
>>   override def run(): Unit = {
>> var i = 0
>> while(true) {
>>   
>> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>>   //To implement a reliable receiver, you have to use 
>> store(multiple-records) to store data.
>>   store(ArrayBuffer(s"$batchID-$i"))
>>   println(s"Stored => [$batchID-$i)]")
>>   Thread.sleep(1000L)
>>   i = i + 1
>> }
>>   }
>> }.start()
>>   }
>>
>>   def onStop() {}
>> }
>>
>> I then created a simple Application which will use the Custom Receiver to
>> stream the data and process it:
>>
>> object DStreamResilienceTest extends App {
>>
>>   val conf = new 
>> SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
>>  "true")
>>   val ssc = new StreamingContext(conf, Seconds(1))
>>   
>> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
>>   val customReceiverStream: ReceiverInputDStream[String] = 
>> ssc.receiverStream(new InMemoryStringReceiver())
>>   customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
>> println(s"processed => [${rdd.collect().toList}]")
>> Thread.sleep(2000L)
>>   }
>>   ssc.start()
>>   ssc.awaitTermination()
>>
>> }
>>
>> As you can see the processing of each received RDD has sleep of 2 seconds
>> while the Strings are stored every second. This creates a backlog and the
>> new strings pile up, and should be stored in the WAL. Indeed, I can see the
>> files in the checkpoint dirs getting updated. Running the app I get output
>> like this:
>>
>> [info] Stored => [1453374654941-0)]
>> [info] processed => [List(1453374654941-0)]
>> [info] Stored => [1453374654941-1)]
>> [info] Stored => [1453374654941-2)]
>> [info] processed => [List(1453374654941-1)]
>> [info] Stored => [1453374654941-3)]
>> [info] Stored => [1453374654941-4)]
>> [info] processed => [List(1453374654941-2)]
>> [info] Stored => [1453374654941-5)]
>> [info] Stored => [1453374654941-6)]
>> [info] processed => [List(1453374654941-3)]
>> [info] Stored => [1453374654941-7)]
>> [info] Stored => [1453374654941-8)]
>> [info] processed => [List(1453374654941-4)]
>> [info] Stored => [1453374654941-9)]
>> [info] Stored => [1453374654941-10)]
>>
>> As you would expect, the storing is out pacing the processing. So I kill
>> the application and restart it. This time I commented out the sleep in the
>> foreachRDD so that the processing can clear any backlog:
>>
>> [info] Stored => [1453374753946-0)]
>> [info] processed => [List(1453374753946-0)]
>> [info] Stored => [1453374753946-1)]
>> [info] processed => [List(1453374753946-1)]
>> [info] Stored => [1453374753946-2)]
>> [info] processed => [List(1453374753946-2)]
>> [info] Stored => [1453374753946-3)]
>> [info] processed => [List(1453374753946-3)]
>> [info] Stored => [1453374753946-4)]
>> [info] processed => [List(1453374753946-4)]
>>
>> As you can see the new events are processed but none from the previous
>> batch. The old WAL logs are cleared and I see log messages like this but
>> the old data does not get processed.
>>
>> INFO WriteAheadLogManager : Recovered 1 write ahead log files from 
>> hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>>
>> What am I doing wrong? I am using Spark 1.5.2.
>>
>> Best regards,
>>
>> Patrick
>>
>
>


Spark Streaming Write Ahead Log (WAL) not replaying data after restart

2016-01-21 Thread Patrick McGloin
Hi all,

To have a simple way of testing the Spark Streaming Write Ahead Log I
created a very simple Custom Input Receiver, which will generate strings
and store those:

class InMemoryStringReceiver extends
Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {

  val batchID = System.currentTimeMillis()

  def onStart() {
new Thread("InMemoryStringReceiver") {
  override def run(): Unit = {
var i = 0
while(true) {
  //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
  //To implement a reliable receiver, you have to use
store(multiple-records) to store data.
  store(ArrayBuffer(s"$batchID-$i"))
  println(s"Stored => [$batchID-$i)]")
  Thread.sleep(1000L)
  i = i + 1
}
  }
}.start()
  }

  def onStop() {}
}

I then created a simple Application which will use the Custom Receiver to
stream the data and process it:

object DStreamResilienceTest extends App {

  val conf = new
SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
"true")
  val ssc = new StreamingContext(conf, Seconds(1))
  
ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
  val customReceiverStream: ReceiverInputDStream[String] =
ssc.receiverStream(new InMemoryStringReceiver())
  customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
println(s"processed => [${rdd.collect().toList}]")
Thread.sleep(2000L)
  }
  ssc.start()
  ssc.awaitTermination()

}

As you can see the processing of each received RDD has sleep of 2 seconds
while the Strings are stored every second. This creates a backlog and the
new strings pile up, and should be stored in the WAL. Indeed, I can see the
files in the checkpoint dirs getting updated. Running the app I get output
like this:

[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]

As you would expect, the storing is out pacing the processing. So I kill
the application and restart it. This time I commented out the sleep in the
foreachRDD so that the processing can clear any backlog:

[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]

As you can see the new events are processed but none from the previous
batch. The old WAL logs are cleared and I see log messages like this but
the old data does not get processed.

INFO WriteAheadLogManager : Recovered 1 write ahead log files from
hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0

What am I doing wrong? I am using Spark 1.5.2.

Best regards,

Patrick


Re: DataFrame partitionBy to a single Parquet file (per partition)

2016-01-15 Thread Patrick McGloin
I will try this in Monday. Thanks for the tip.

On Fri, 15 Jan 2016, 18:58 Cheng Lian <lian.cs@gmail.com> wrote:

> You may try DataFrame.repartition(partitionExprs: Column*) to shuffle all
> data belonging to a single (data) partition into a single (RDD) partition:
>
> df.coalesce(1).repartition("entity", "year", "month", "day", 
> "status").write.partitionBy("entity", "year", "month", "day", 
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> (Unfortunately the naming here can be quite confusing.)
>
>
> Cheng
>
>
> On 1/14/16 11:48 PM, Patrick McGloin wrote:
>
> Hi,
>
> I would like to reparation / coalesce my data so that it is saved into one
> Parquet file per partition. I would also like to use the Spark SQL
> partitionBy API. So I could do that like this:
>
> df.coalesce(1).write.partitionBy("entity", "year", "month", "day", 
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> I've tested this and it doesn't seem to perform well. This is because
> there is only one partition to work on in the dataset and all the
> partitioning, compression and saving of files has to be done by one CPU
> core.
>
> I could rewrite this to do the partitioning manually (using filter with
> the distinct partition values for example) before calling coalesce.
>
> But is there a better way to do this using the standard Spark SQL API?
>
> Best regards,
>
> Patrick
>
>
>
>


DataFrame partitionBy to a single Parquet file (per partition)

2016-01-14 Thread Patrick McGloin
Hi,

I would like to reparation / coalesce my data so that it is saved into one
Parquet file per partition. I would also like to use the Spark SQL
partitionBy API. So I could do that like this:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day",
"status").mode(SaveMode.Append).parquet(s"$location")

I've tested this and it doesn't seem to perform well. This is because there
is only one partition to work on in the dataset and all the partitioning,
compression and saving of files has to be done by one CPU core.

I could rewrite this to do the partitioning manually (using filter with the
distinct partition values for example) before calling coalesce.

But is there a better way to do this using the standard Spark SQL API?

Best regards,

Patrick


Re: Data in one partition after reduceByKey

2015-11-23 Thread Patrick McGloin
I will answer my own question, since I figured it out.  Here is my answer
in case anyone else has the same issue.

My DateTimes were all without seconds and milliseconds since I wanted to
group data belonging to the same minute. The hashCode() for Joda DateTimes
which are one minute apart is a constant:

scala> val now = DateTime.now
now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z

scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode -
now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode
res42: Int = 6

As can be seen by this example, if the hashCode values are similarly
spaced, they can end up in the same partition:

scala> val nums = for(i <- 0 to 100) yield ((i*20 % 1000), i)
nums: scala.collection.immutable.IndexedSeq[(Int, Int)] =
Vector((0,0), (20,1), (40,2), (60,3), (80,4), (100,5), (120,6),
(140,7), (160,8), (180,9), (200,10), (220,11), (240,12), (260,13),
(280,14), (300,15), (320,16), (340,17), (360,18), (380,19), (400,20),
(420,21), (440,22), (460,23), (480,24), (500,25), (520,26), (540,27),
(560,28), (580,29), (600,30), (620,31), (640,32), (660,33), (680,34),
(700,35), (720,36), (740,37), (760,38), (780,39), (800,40), (820,41),
(840,42), (860,43), (880,44), (900,45), (920,46), (940,47), (960,48),
(980,49), (0,50), (20,51), (40,52), (60,53), (80,54), (100,55),
(120,56), (140,57), (160,58), (180,59), (200,60), (220,61), (240,62),
(260,63), (280,64), (300,65), (320,66), (340,67), (360,68), (380,69),
(400,70), (420,71), (440,72), (460,73), (480,74), (500...

scala> val rddNum = sc.parallelize(nums)
rddNum: org.apache.spark.rdd.RDD[(Int, Int)] =
ParallelCollectionRDD[0] at parallelize at :23

scala> val reducedNum = rddNum.reduceByKey(_+_)
reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at
reduceByKey at :25

scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator,
true).collect.toList

res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0)

To distribute my data more evenly across the partitions I created my own
custom Partitoiner:

class JodaPartitioner(rddNumPartitions: Int) extends Partitioner {
  def numPartitions: Int = rddNumPartitions
  def getPartition(key: Any): Int = {
key match {
  case dateTime: DateTime =>
val sum = dateTime.getYear + dateTime.getMonthOfYear +
dateTime.getDayOfMonth + dateTime.getMinuteOfDay  +
dateTime.getSecondOfDay
sum % numPartitions
  case _ => 0
}
  }
}


On 20 November 2015 at 17:17, Patrick McGloin <mcgloin.patr...@gmail.com>
wrote:

> Hi,
>
> I have Spark application which contains the following segment:
>
> val reparitioned = rdd.repartition(16)
> val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, 
> endDate)
> val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, 
> kv._2))
> val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)
>
> When I run this with some logging this is what I see:
>
> reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, 
> 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
> filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]
>
> My logging is done using these two lines:
>
> val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, 
> true)log.info(s"rdd ==> [${sizes.collect.toList}]")
>
> My question is why does my data end up in one partition after the
> reduceByKey? After the filter it can be seen that the data is evenly
> distributed, but the reduceByKey results in data in only one partition.
>
> Thanks,
>
> Patrick
>


Data in one partition after reduceByKey

2015-11-20 Thread Patrick McGloin
Hi,

I have Spark application which contains the following segment:

val reparitioned = rdd.repartition(16)
val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned,
startDate, endDate)
val mapped: RDD[(DateTime, myData)] =
filtered.map(kv=(kv._1.processingTime, kv._2))
val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)

When I run this with some logging this is what I see:

reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512,
2508, 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076,
2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076,
2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]

My logging is done using these two lines:

val sizes: RDD[Int] = rdd.mapPartitions(iter =>
Array(iter.size).iterator, true)log.info(s"rdd ==>
[${sizes.collect.toList}]")

My question is why does my data end up in one partition after the
reduceByKey? After the filter it can be seen that the data is evenly
distributed, but the reduceByKey results in data in only one partition.

Thanks,

Patrick


Re: Spark UI consuming lots of memory

2015-10-27 Thread Patrick McGloin
Hi Nicholas,

I think you are right about the issue relating to Spark-11126, I'm seeing
it as well.

Did you find any workaround?  Looking at the pull request for the fix it
doesn't look possible.

Best regards,
Patrick

On 15 October 2015 at 19:40, Nicholas Pritchard <
nicholas.pritch...@falkonry.com> wrote:

> Thanks for your help, most likely this is the memory leak you are fixing
> in https://issues.apache.org/jira/browse/SPARK-11126.
> -Nick
>
> On Mon, Oct 12, 2015 at 9:00 PM, Shixiong Zhu  wrote:
>
>> In addition, you cannot turn off JobListener and SQLListener now...
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-10-13 11:59 GMT+08:00 Shixiong Zhu :
>>
>>> Is your query very complicated? Could you provide the output of
>>> `explain` your query that consumes an excessive amount of memory? If this
>>> is a small query, there may be a bug that leaks memory in SQLListener.
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-10-13 11:44 GMT+08:00 Nicholas Pritchard <
>>> nicholas.pritch...@falkonry.com>:
>>>
 As an update, I did try disabling the ui with "spark.ui.enabled=false",
 but the JobListener and SQLListener still consume a lot of memory, leading
 to OOM error. Has anyone encountered this before? Is the only solution just
 to increase the driver heap size?

 Thanks,
 Nick

 On Mon, Oct 12, 2015 at 8:42 PM, Nicholas Pritchard <
 nicholas.pritch...@falkonry.com> wrote:

> I set those configurations by passing to spark-submit script:
> "bin/spark-submit --conf spark.ui.retainedJobs=20 ...". I have verified
> that these configurations are being passed correctly because they are
> listed in the environments tab and also by counting the number of
> job/stages that are listed. The "spark.sql.ui.retainedExecutions=0"
> only applies to the number of "completed" executions; there will always be
> a "running" execution. For some reason, I have one execution that consumes
> an excessive amount of memory.
>
> Actually, I am not interested in the SQL UI, as I find the Job/Stages
> UI to have sufficient information. I am also using Spark Standalone 
> cluster
> manager so have not had to use the history server.
>
>
> On Mon, Oct 12, 2015 at 8:17 PM, Shixiong Zhu 
> wrote:
>
>> Could you show how did you set the configurations? You need to set
>> these configurations before creating SparkContext and SQLContext.
>>
>> Moreover, the history sever doesn't support SQL UI. So
>> "spark.eventLog.enabled=true" doesn't work now.
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-10-13 2:01 GMT+08:00 pnpritchard <
>> nicholas.pritch...@falkonry.com>:
>>
>>> Hi,
>>>
>>> In my application, the Spark UI is consuming a lot of memory,
>>> especially the
>>> SQL tab. I have set the following configurations to reduce the memory
>>> consumption:
>>> - spark.ui.retainedJobs=20
>>> - spark.ui.retainedStages=40
>>> - spark.sql.ui.retainedExecutions=0
>>>
>>> However, I still get OOM errors in the driver process with the
>>> default 1GB
>>> heap size. The following link is a screen shot of a heap dump report,
>>> showing the SQLListener instance having a retained size of 600MB.
>>>
>>> https://cloud.githubusercontent.com/assets/5124612/10404379/20fbdcfc-6e87-11e5-9415-27e25193a25c.png
>>>
>>> Rather than just increasing the allotted heap size, does anyone have
>>> any
>>> other ideas? Is it possible to disable the SQL tab specifically? I
>>> also
>>> thought about serving the UI from disk rather than memory with
>>> "spark.eventLog.enabled=true" and "spark.ui.enabled=false". Has
>>> anyone tried
>>> this before?
>>>
>>> Thanks,
>>> Nick
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-consuming-lots-of-memory-tp25033.html
>>> Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

>>>
>>
>


Fwd: [akka-user] Akka Camel plus Spark Streaming

2014-10-27 Thread Patrick McGloin
Hi Spark Users,

I am trying to use the Akka Camel library together with Spark Streaming and
it is not working when I deploy my Spark application to the Spark Cluster.
It does work when I run the application locally so this seems to be an
issue with how Spark loads the reference.conf file from the Akka Camel jar
when the application gets deployed to the cluster.

I have tried to make a simple application to demonstrate the problem.  It
has a class which uses Akka Camel to create an Actor Based Receiver for
Spark Streaming:

class NettyReceiver[T: ClassTag](port: Int) extends Consumer with
ActorHelper {
  def endpointUri = netty:tcp://localhost: + port
  def receive = {
case data: T = store(data)
  }
}

And the application creates a DStream using the previous class:

object SparkCamelSBT extends App {
  val sparkConf = new SparkConf().setAppName(RawTRCTransformer)
  val ssc = new StreamingContext(sparkConf, Milliseconds(5000))

  case class Data(data: String)

  val dStream = ssc.actorStream[Data](Props(new NettyReceiver[Data](4548)),
TRCNettyReceiver)

  dStream.print()

  ssc.start()
  ssc.awaitTermination()

}

In local mode this works.  When deployed to the Spark Cluster the following
error is logged by the worker who tries to use Akka Camel:



-- Forwarded message --
From: Patrick McGloin mcgloin.patr...@gmail.com
Date: 24 October 2014 15:09
Subject: Re: [akka-user] Akka Camel plus Spark Streaming
To: akka-u...@googlegroups.com


Hi Patrik,

Thanks for your response.  Based on what you said, I tried a couple more
things:

- I tried copying the Akka Camel part of the reference.conf to my
application in case it would try to read it from there.
- I tried calling addJar from the SparkContext to load the
akka-camel_2.10-2.3.6.jar
file.
- I tired adding akka-camel_2.10-2.3.6.jar to the SPARK_CLASSPATH and
restarting the Spark Master and its Workers.

With the third item my thinking was that as Spark had already started Akka
the Akka Camel jar needed to be there at the start and not supplied when a
job was to be started on the Spark Cluster.

None of these worked however.

Thanks for your help.  I will try and boil this down to a very simple
example on my laptop and try and reproduce it.  If its still a problem in
its most basic form I'll ask the Spark group if they know how it should
work.

Best regards,
Patrick



On 24 October 2014 13:36, Patrik Nordwall patrik.nordw...@gmail.com wrote:

 If you are not using OSGi you should not bother about my comment related
 to akka-osgi jar.

 If you have several jar files in your classpath the reference.conf files
 are merged automatically. If you package your app as a fat jar you must
 make sure that all reference.conf files are merged into one big
 reference.conf file when you assemble the fat jar.

 The error you see indicates that the camel section is not included in the
 reference.conf file(s) that you have in the classpath when you run.

 I don't know if Spark is doing anything special that would break the
 loading of the reference.conf files.

 Regards,
 Patrik

 On Fri, Oct 24, 2014 at 11:59 AM, Patrick McGloin 
 mcgloin.patr...@gmail.com wrote:

 Hi Akka users,

 I am trying to use Akka Camel together with Spark Streaming and I am
 getting this error message:

 14/10/23 09:31:30 ERROR OneForOneStrategy: No configuration setting found
 for key 'akka.camel'
 akka.actor.ActorInitializationException: exception during creation

 I have followed the pattern for creating an Actor based receiver:

 http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 My Actor looks like this:

 class NettyReceiver[T: ClassTag](port: Int) extends Consumer with
 ActorHelper {
   def endpointUri = netty:tcp://xyz: + port
   def receive = {
 case data: T = store(data)
   }
 }

 And I create a DStream like this:

 val dstream =  ssc.actorStream[MessageType](Props(new
 NettyReceiver[MessageType](4548)), msgNettyReceiver)

 All good so far.  I use sbt assembly and sbt package to create jar files
 for the project and the application and I run it on the server using this
 command:

 sudo ./spark-submit --class SparkStreamingCamelApp --master
 spark://xyz:7077 --jars  /opt/app/bigProject.jar --total-executor-cores
 3 /opt/app/smallApplication.jar

 The streaming application runs without errors but in the Spark worker log
 I see these errors:

 akka.actor.ActorInitializationException: exception during creation
 Caused by: java.lang.reflect.InvocationTargetException
 Caused by: akka.actor.InvalidActorNameException: actor name
 [camel-supervisor] is not unique!
 14/10/23 09:31:30 ERROR OneForOneStrategy: No configuration setting found
 for key 'akka.camel'
 akka.actor.ActorInitializationException: exception during creation

 I have researched the issue and found that Patrick Nordwell said this
 issue indicates that the reference.conf for akka-camel is not loaded:

 http://grokbase.com/t/gg/akka-user/13bp25kd7f/akka-camel-osgi

 If I run the following

Re: [akka-user] Akka Camel plus Spark Streaming

2014-10-27 Thread Patrick McGloin
Apologies, was not finished typing.

14/10/23 09:31:30 ERROR OneForOneStrategy: No configuration setting found
for key 'akka.camel'
akka.actor.ActorInitializationException: exception during creation

This error means that the Akka Camel reference.conf is not being loaded,
even though it is in the assembled jar file.  Please see the mails below,
which I sent to the Akka group for details.

Is there something I am doing wrong?  Is there a way to get the Akka
Cluster to load the reference.conf from Camel?

Any help greatly appreciated!

Best regards,
Patrick


On 27 October 2014 11:33, Patrick McGloin mcgloin.patr...@gmail.com wrote:

 Hi Spark Users,

 I am trying to use the Akka Camel library together with Spark Streaming
 and it is not working when I deploy my Spark application to the Spark
 Cluster.  It does work when I run the application locally so this seems to
 be an issue with how Spark loads the reference.conf file from the Akka
 Camel jar when the application gets deployed to the cluster.

 I have tried to make a simple application to demonstrate the problem.  It
 has a class which uses Akka Camel to create an Actor Based Receiver for
 Spark Streaming:

 class NettyReceiver[T: ClassTag](port: Int) extends Consumer with
 ActorHelper {
   def endpointUri = netty:tcp://localhost: + port
   def receive = {
 case data: T = store(data)
   }
 }

 And the application creates a DStream using the previous class:

 object SparkCamelSBT extends App {
   val sparkConf = new SparkConf().setAppName(RawTRCTransformer)
   val ssc = new StreamingContext(sparkConf, Milliseconds(5000))

   case class Data(data: String)

   val dStream = ssc.actorStream[Data](Props(new
 NettyReceiver[Data](4548)), TRCNettyReceiver)

   dStream.print()

   ssc.start()
   ssc.awaitTermination()

 }

 In local mode this works.  When deployed to the Spark Cluster the
 following error is logged by the worker who tries to use Akka Camel:



 -- Forwarded message --
 From: Patrick McGloin mcgloin.patr...@gmail.com
 Date: 24 October 2014 15:09
 Subject: Re: [akka-user] Akka Camel plus Spark Streaming
 To: akka-u...@googlegroups.com


 Hi Patrik,

 Thanks for your response.  Based on what you said, I tried a couple more
 things:

 - I tried copying the Akka Camel part of the reference.conf to my
 application in case it would try to read it from there.
 - I tried calling addJar from the SparkContext to load the 
 akka-camel_2.10-2.3.6.jar
 file.
 - I tired adding akka-camel_2.10-2.3.6.jar to the SPARK_CLASSPATH and
 restarting the Spark Master and its Workers.

 With the third item my thinking was that as Spark had already started Akka
 the Akka Camel jar needed to be there at the start and not supplied when a
 job was to be started on the Spark Cluster.

 None of these worked however.

 Thanks for your help.  I will try and boil this down to a very simple
 example on my laptop and try and reproduce it.  If its still a problem in
 its most basic form I'll ask the Spark group if they know how it should
 work.

 Best regards,
 Patrick



 On 24 October 2014 13:36, Patrik Nordwall patrik.nordw...@gmail.com
 wrote:

 If you are not using OSGi you should not bother about my comment related
 to akka-osgi jar.

 If you have several jar files in your classpath the reference.conf files
 are merged automatically. If you package your app as a fat jar you must
 make sure that all reference.conf files are merged into one big
 reference.conf file when you assemble the fat jar.

 The error you see indicates that the camel section is not included in the
 reference.conf file(s) that you have in the classpath when you run.

 I don't know if Spark is doing anything special that would break the
 loading of the reference.conf files.

 Regards,
 Patrik

 On Fri, Oct 24, 2014 at 11:59 AM, Patrick McGloin 
 mcgloin.patr...@gmail.com wrote:

 Hi Akka users,

 I am trying to use Akka Camel together with Spark Streaming and I am
 getting this error message:

 14/10/23 09:31:30 ERROR OneForOneStrategy: No configuration setting
 found for key 'akka.camel'
 akka.actor.ActorInitializationException: exception during creation

 I have followed the pattern for creating an Actor based receiver:

 http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 My Actor looks like this:

 class NettyReceiver[T: ClassTag](port: Int) extends Consumer with
 ActorHelper {
   def endpointUri = netty:tcp://xyz: + port
   def receive = {
 case data: T = store(data)
   }
 }

 And I create a DStream like this:

 val dstream =  ssc.actorStream[MessageType](Props(new
 NettyReceiver[MessageType](4548)), msgNettyReceiver)

 All good so far.  I use sbt assembly and sbt package to create jar files
 for the project and the application and I run it on the server using this
 command:

 sudo ./spark-submit --class SparkStreamingCamelApp --master
 spark://xyz:7077 --jars  /opt/app/bigProject.jar --total-executor-cores
 3 /opt/app/smallApplication.jar

 The streaming

Re: Spark SQL + Hive + JobConf NoClassDefFoundError

2014-10-01 Thread Patrick McGloin
FYI, in case anybody else has this problem, we switched to Spark 1.1
(outside CDH) and the same Spark application worked first time (once
recompiled with Spark 1.1 libs of course).  I assume this is because Spark
1.1 is compiled with Hive.

On 29 September 2014 17:41, Patrick McGloin mcgloin.patr...@gmail.com
wrote:

 Hi,

 I have an error when submitting a Spark SQL application to our Spark
 cluster:

 14/09/29 16:02:11 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.NoClassDefFoundError
 *java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf*
 at
 org.apache.spark.sql.hive.SparkHiveHadoopWriter.setIDs(SparkHadoopWriter.scala:169)
 at
 org.apache.spark.sql.hive.SparkHiveHadoopWriter.setup(SparkHadoopWriter.scala:69)
 at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org
 $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(hiveOperators.scala:260)
 at
 org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(hiveOperators.scala:274)
 at
 org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(hiveOperators.scala:274)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 I assume this is because the Executor does not have the hadoop-core.jar
 file.  I've tried adding it to the SparkContext using addJar but this
 didn't help.

 I also see that the documentation says you must rebuild Spark if you want
 to use Hive.

 https://spark.apache.org/docs/1.0.2/sql-programming-guide.html#hive-tables

 Is this really true or can we just package the jar files with the Spark
 Application we build?  Rebuilding Spark itself isn't possible for us as it
 is installed on a VM without internet access and we are using the Cloudera
 distribution (Spark 1.0).

 Is it possible to assemble the Hive dependencies into our Spark
 Application and submit this to the cluster?  I've tried to do this with
 spark-submit (and the Hadoop JobConf class is in AAC-assembly-1.0.jar) but
 the Executor doesn't find the class.  Here is the command:

 sudo ./spark-submit --class aac.main.SparkDriver --master
 spark://localhost:7077 --jars AAC-assembly-1.0.jar aacApp_2.10-1.0.jar

 Any pointers would be appreciated!

 Best regards,
 Patrick





Spark SQL + Hive + JobConf NoClassDefFoundError

2014-09-29 Thread Patrick McGloin
Hi,

I have an error when submitting a Spark SQL application to our Spark
cluster:

14/09/29 16:02:11 WARN scheduler.TaskSetManager: Loss was due to
java.lang.NoClassDefFoundError
*java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf*
at
org.apache.spark.sql.hive.SparkHiveHadoopWriter.setIDs(SparkHadoopWriter.scala:169)
at
org.apache.spark.sql.hive.SparkHiveHadoopWriter.setup(SparkHadoopWriter.scala:69)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org
$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(hiveOperators.scala:260)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(hiveOperators.scala:274)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(hiveOperators.scala:274)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I assume this is because the Executor does not have the hadoop-core.jar
file.  I've tried adding it to the SparkContext using addJar but this
didn't help.

I also see that the documentation says you must rebuild Spark if you want
to use Hive.

https://spark.apache.org/docs/1.0.2/sql-programming-guide.html#hive-tables

Is this really true or can we just package the jar files with the Spark
Application we build?  Rebuilding Spark itself isn't possible for us as it
is installed on a VM without internet access and we are using the Cloudera
distribution (Spark 1.0).

Is it possible to assemble the Hive dependencies into our Spark Application
and submit this to the cluster?  I've tried to do this with spark-submit
(and the Hadoop JobConf class is in AAC-assembly-1.0.jar) but the Executor
doesn't find the class.  Here is the command:

sudo ./spark-submit --class aac.main.SparkDriver --master
spark://localhost:7077 --jars AAC-assembly-1.0.jar aacApp_2.10-1.0.jar

Any pointers would be appreciated!

Best regards,
Patrick


Re: type issue: found RDD[T] expected RDD[A]

2014-08-19 Thread Patrick McGloin
Hi Amit,

I think the type of the data contained in your RDD needs to be a known case
class and not abstract for createSchemaRDD.  This makes sense when you
think it needs to know about the fields in the object to create the schema.

I had the same issue when I used an abstract base class for a collection of
types I had.

Best regards,
Patrick


On 6 August 2014 07:58, Amit Kumar kumarami...@gmail.com wrote:

 Hi All,

 I am having some trouble trying to write generic code that uses sqlContext
 and RDDs. Can you suggest what might be wrong?

  class SparkTable[T : ClassTag](val sqlContext:SQLContext, val extractor:
 (String) = (T) ) {

   private[this] var location:Option[String] =None
   private[this] var name:Option[String]=None
   private[this] val sc = sqlContext.sparkContext
   ...

 def makeRDD(sqlQuery:String):SchemaRDD={
 require(this.location!=None)
 require(this.name!=None)
 import sqlContext._
 val rdd:RDD[String] = sc.textFile(this.location.get)
 val rddT:RDD[T] = rdd.map(extractor)
 val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
 schemaRDD.registerAsTable(name.get)
 val all = sqlContext.sql(sqlQuery)
 all
   }

 }

 I use it as below:

  def extractor(line:String):POJO={
   val splits= line.split(pattern).toList
   POJO(splits(0),splits(1),splits(2),splits(3))
 }

val pojoTable:SparkTable[POJO] = new
 SparkTable[POJO](sqlContext,extractor)

 val identityData:SchemaRDD=
 pojoTable.atLocation(hdfs://location/table)
   .withName(pojo)
   .makeRDD(SELECT * FROM pojo)


 I get compilation failure

 inferred type arguments [T] do not conform to method createSchemaRDD's
 type parameter bounds [A : Product]
 [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
 [error]  ^
 [error]  SparkTable.scala:37: type mismatch;
 [error]  found   : org.apache.spark.rdd.RDD[T]
 [error]  required: org.apache.spark.rdd.RDD[A]
 [error] val schemaRDD:SchemaRDD= createSchemaRDD(rddT)
 [error]  ^
 [error] two errors found

 I am probably missing something basic either in scala reflection/types or
 implicits?

 Any hints would be appreciated.

 Thanks
 Amit





Re: Spark SQL, Parquet and Impala

2014-08-02 Thread Patrick McGloin
Hi Michael,

Thanks for your reply.  Is this the correct way to load data from Spark
into Parquet?  Somehow it doesn't feel right.  When we followed the steps
described for storing the data into Hive tables everything was smooth, we
used HiveContext and the table is automatically recognised by Hive (and
Impala).

When we loaded the data into Parquet using the method I described we used
both SQLContext and HiveContext.  We had to manually define the table using
the CREATE EXTERNAL in Hive.  Then we have to refresh to see changes.

So the problem isn't just the refresh, its that we're unsure of the best
practice for loading data into Parquet tables.  Is the way we are doing the
Spark part correct in your opinion?

Best regards,
Patrick






On 1 August 2014 19:32, Michael Armbrust mich...@databricks.com wrote:

 So is the only issue that impala does not see changes until you refresh
 the table?  This sounds like a configuration that needs to be changed on
 the impala side.


 On Fri, Aug 1, 2014 at 7:20 AM, Patrick McGloin mcgloin.patr...@gmail.com
  wrote:

 Sorry, sent early, wasn't finished typing.

 CREATE EXTERNAL TABLE 

 Then we can select the data using Impala.  But this is registered as an
 external table and must be refreshed if new data is inserted.

 Obviously this doesn't seem good and doesn't seem like the correct
 solution.

 How should we insert data from SparkSQL into a Parquet table which can be
 directly queried by Impala?

 Best regards,
 Patrick


 On 1 August 2014 16:18, Patrick McGloin mcgloin.patr...@gmail.com
 wrote:

 Hi,

 We would like to use Spark SQL to store data in Parquet format and then
 query that data using Impala.

 We've tried to come up with a solution and it is working but it doesn't
 seem good.  So I was wondering if you guys could tell us what is the
 correct way to do this.  We are using Spark 1.0 and Impala 1.3.1.

 First we are registering our tables using SparkSQL:

 val sqlContext = new SQLContext(sc)
 sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt,
 true)

 Then we are using the HiveContext to register the table and do the
 insert:

 val hiveContext = new HiveContext(sc)
 import hiveContext._

 hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable)
 eventsDStream.foreachRDD(event=event.insertInto(ParqTable))

 Now we have the data stored in a Parquet file.  To access it in Hive or
 Impala we run






Spark SQL, Parquet and Impala

2014-08-01 Thread Patrick McGloin
Hi,

We would like to use Spark SQL to store data in Parquet format and then
query that data using Impala.

We've tried to come up with a solution and it is working but it doesn't
seem good.  So I was wondering if you guys could tell us what is the
correct way to do this.  We are using Spark 1.0 and Impala 1.3.1.

First we are registering our tables using SparkSQL:

val sqlContext = new SQLContext(sc)
sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt,
true)

Then we are using the HiveContext to register the table and do the insert:

val hiveContext = new HiveContext(sc)
import hiveContext._
hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable)
eventsDStream.foreachRDD(event=event.insertInto(ParqTable))

Now we have the data stored in a Parquet file.  To access it in Hive or
Impala we run


Re: Spark SQL, Parquet and Impala

2014-08-01 Thread Patrick McGloin
Sorry, sent early, wasn't finished typing.

CREATE EXTERNAL TABLE 

Then we can select the data using Impala.  But this is registered as an
external table and must be refreshed if new data is inserted.

Obviously this doesn't seem good and doesn't seem like the correct solution.

How should we insert data from SparkSQL into a Parquet table which can be
directly queried by Impala?

Best regards,
Patrick


On 1 August 2014 16:18, Patrick McGloin mcgloin.patr...@gmail.com wrote:

 Hi,

 We would like to use Spark SQL to store data in Parquet format and then
 query that data using Impala.

 We've tried to come up with a solution and it is working but it doesn't
 seem good.  So I was wondering if you guys could tell us what is the
 correct way to do this.  We are using Spark 1.0 and Impala 1.3.1.

 First we are registering our tables using SparkSQL:

 val sqlContext = new SQLContext(sc)
 sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt,
 true)

 Then we are using the HiveContext to register the table and do the insert:

 val hiveContext = new HiveContext(sc)
 import hiveContext._

 hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable)
 eventsDStream.foreachRDD(event=event.insertInto(ParqTable))

 Now we have the data stored in a Parquet file.  To access it in Hive or
 Impala we run




Spark Streaming and JMS

2014-05-05 Thread Patrick McGloin
Hi all,

Is there a best practice for subscribing to JMS with Spark Streaming?  I
have searched but not found anything conclusive.

In the absence of a standard practice the solution I was thinking of was to
use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark
Streaming Custom Receiver.  So the actor would look something like this:

class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
Consumer {
  //e.g. jms:sonicmq://localhost:2506/queue?destination=SampleQ1
  def endpointUri = jmsURI
  lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)

  protected override def onStart() {
blockGenerator.start
  }

  def receive = {
case msg: CamelMessage = { blockGenerator += msg.body }
case _ = { /* ... */ }
  }

  protected override def onStop() {
blockGenerator.stop
  }
}

And then in the main application create receivers like this:

val ssc = new StreamingContext(...)
object tascQueue extends JmsReceiver[String](ssc) {
override def getReceiver():JmsReceiver[String] = {
new JmsReceiver(jms:sonicmq://localhost:2506/queue?destination=TascQueue)
}
}
ssc.registerInputStream(tascQueue)

Is this the best way to go?

Best regards,
Patrick