Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Mike Trienis
Thanks for your response Yana,

I can increase the MaxPermSize parameter and it will allow me to run the
unit test a few more times before I run out of memory.

However, the primary issue is that running the same unit test in the same
JVM (multiple times) results in increased memory (each run of the unit
test) and I believe it has something to do with HiveContext not reclaiming
memory after it is finished (or I'm not shutting it down properly).

It could very well be related to sbt, however, it's not clear to me.


On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 The PermGen space error is controlled with MaxPermSize parameter. I run
 with this in my pom, I think copied pretty literally from Spark's own
 tests... I don't know what the sbt equivalent is but you should be able to
 pass it...possibly via SBT_OPTS?


  plugin
   groupIdorg.scalatest/groupId
   artifactIdscalatest-maven-plugin/artifactId
   version1.0/version
   configuration

 reportsDirectory${project.build.directory}/surefire-reports/reportsDirectory
   parallelfalse/parallel
   junitxml./junitxml
   filereportsSparkTestSuite.txt/filereports
   argLine-Xmx3g -XX:MaxPermSize=256m
 -XX:ReservedCodeCacheSize=512m/argLine
   stderr/
   systemProperties
   java.awt.headlesstrue/java.awt.headless
   spark.testing1/spark.testing
   spark.ui.enabledfalse/spark.ui.enabled

 spark.driver.allowMultipleContextstrue/spark.driver.allowMultipleContexts
   /systemProperties
   /configuration
   executions
   execution
   idtest/id
   goals
   goaltest/goal
   /goals
   /execution
   /executions
   /plugin
   /plugins


 On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hello,

 I am using sbt and created a unit test where I create a `HiveContext` and
 execute some query and then return. Each time I run the unit test the JVM
 will increase it's memory usage until I get the error:

 Internal error when running tests: java.lang.OutOfMemoryError: PermGen
 space
 Exception in thread Thread-2 java.io.EOFException

 As a work-around, I can fork a new JVM each time I run the unit test,
 however, it seems like a bad solution as takes a while to run the unit
 test.

 By the way, I tried to importing the TestHiveContext:

- import org.apache.spark.sql.hive.test.TestHiveContext

 However, it suffers from the same memory issue. Has anyone else suffered
 from the same problem? Note that I am running these unit tests on my mac.

 Cheers, Mike.





How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-25 Thread Mike Trienis
Hello,

I am using sbt and created a unit test where I create a `HiveContext` and
execute some query and then return. Each time I run the unit test the JVM
will increase it's memory usage until I get the error:

Internal error when running tests: java.lang.OutOfMemoryError: PermGen space
Exception in thread Thread-2 java.io.EOFException

As a work-around, I can fork a new JVM each time I run the unit test,
however, it seems like a bad solution as takes a while to run the unit
test.

By the way, I tried to importing the TestHiveContext:

   - import org.apache.spark.sql.hive.test.TestHiveContext

However, it suffers from the same memory issue. Has anyone else suffered
from the same problem? Note that I am running these unit tests on my mac.

Cheers, Mike.


Spark SQL window functions (RowsBetween)

2015-08-20 Thread Mike Trienis
Hi All,

I would like some clarification regarding window functions for Apache Spark
1.4.0

   -
   
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

In particular, the rowsBetween

   * {{{
   *   val w = Window.partitionBy(name).orderBy(id)
   *   df.select(
   * sum(price).over(w.rangeBetween(Long.MinValue, 2)),
   * avg(price).over(w.rowsBetween(0, 4))
   *   )
   * }}}


Are any of the window functions available without a hive context? If the
answer is no, then is there any other way to accomplish this without using
hive?

I need to compare the the i[th] row with the [i-1]th row of col2 (sorted by
col1). If item_i of the i[th] row and the item_[i-1] of the [i-1]th row are
different then I need to increment the count of item_[i-1] by 1.


col1| col2
--
1| item_1
2| item_1
3| item_2
4| item_1
5| item_2
6| item_1

In the above example, if we scan two rows at a time downwards,  we see that
row 2 and row 3 are different therefore we add one to item_1. Next, we see
that row 3 is different from row 4, then add one to item_2. Continue until
we end up with:

 col2  | col3
---
 item_1  | 2
 item_2  | 2

Thanks, Mike.


Optimal way to implement a small lookup table for identifiers in an RDD

2015-08-10 Thread Mike Trienis
Hi All,

I have an RDD of case class objects.

scala case class Entity(
 | value: String,
 | identifier: String
 | )
defined class Entity

scala Entity(hello, id1)
res25: Entity = Entity(hello,id1)

During a map operation, I'd like to return a new RDD that contains all of
the data of the original RDD with the addition of new data that was looked
up based on the identifiers provided.

The lookup table table in Cassandra looks something like...

id|   type
-+-
id1 |  action
id2 |  view

The end result would be an RDD of EntityExtended

case class EntityExtended(
value: String,
identifier: String
type: String
)

I believe that it would make sense to use a broadcast variable. However,
I'm not sure what the best way would be to incorporate it during a map
operation.

rdd.map(MyObject.extendEntity)

object MyObject {
   def extendEntity(entity: Entity): EntityExtended = {
   val id = entity.identifier

   // lookup identifier in broadcast variable?
   }
}

Thanks, Mike.


Re: Data frames select and where clause dependency

2015-07-20 Thread Mike Trienis
Definitely, thanks Mohammed.

On Mon, Jul 20, 2015 at 5:47 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Thanks, Harish.



 Mike – this would be a cleaner version for your use case:

 df.filter(df(filter_field) === value).select(field1).show()



 Mohammed



 *From:* Harish Butani [mailto:rhbutani.sp...@gmail.com]
 *Sent:* Monday, July 20, 2015 5:37 PM
 *To:* Mohammed Guller
 *Cc:* Michael Armbrust; Mike Trienis; user@spark.apache.org

 *Subject:* Re: Data frames select and where clause dependency



 Yes via:  org.apache.spark.sql.catalyst.optimizer.ColumnPruning

 See DefaultOptimizer.batches for list of logical rewrites.



 You can see the optimized plan by printing: df.queryExecution.optimizedPlan



 On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Michael,

 How would the Catalyst optimizer optimize this version?

 df.filter(df(filter_field) === value).select(field1).show()

 Would it still read all the columns in df or would it read only
 “filter_field” and “field1” since only two columns are used (assuming other
 columns from df are not used anywhere else)?



 Mohammed



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Friday, July 17, 2015 1:39 PM
 *To:* Mike Trienis
 *Cc:* user@spark.apache.org
 *Subject:* Re: Data frames select and where clause dependency



 Each operation on a dataframe is completely independent and doesn't know
 what operations happened before it.  When you do a selection, you are
 removing other columns from the dataframe and so the filter has nothing to
 operate on.



 On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 I'd like to understand why the where field must exist in the select
 clause.



 For example, the following select statement works fine

- df.select(field1, filter_field).filter(df(filter_field) ===
value).show()

  However, the next one fails with the error in operator !Filter
 (filter_field#60 = value);

- df.select(field1).filter(df(filter_field) === value).show()

  As a work-around, it seems that I can do the following

- df.select(field1, filter_field).filter(df(filter_field) ===
value).drop(filter_field).show()



 Thanks, Mike.







Data frames select and where clause dependency

2015-07-17 Thread Mike Trienis
I'd like to understand why the where field must exist in the select clause.

For example, the following select statement works fine

   - df.select(field1, filter_field).filter(df(filter_field) ===
   value).show()

However, the next one fails with the error in operator !Filter
(filter_field#60 = value);

   - df.select(field1).filter(df(filter_field) === value).show()

As a work-around, it seems that I can do the following

   - df.select(field1, filter_field).filter(df(filter_field) ===
   value).drop(filter_field).show()


Thanks, Mike.


Aggregating metrics using Cassandra and Spark streaming

2015-06-24 Thread Mike Trienis
Hello,

I'd like to understand how other people have been aggregating metrics
using Spark Streaming and Cassandra database. Currently I have design
some data models that will stored the rolled up metrics. There are two
models that I am considering:

CREATE TABLE rollup_using_counters (
metric_1 text,
metric_1_value counter
);

The model above is nice because I only need to write and execute a
single query when updating the counter value. The problem is that I
need these counter values to be fairly accurate and based on some
discussions from the Cassandra folks it sounds like there is some
potential for over and under counting if the database is under load.

CREATE TABLE rollup_using_update (
metric_1 text,
metric_1_value int
);

This model on the other hand will require the metric values to be
updated and therefore the operation is idempotent. The problem is that
I will need to read the metrics into the Spark streaming application
and perform the addition prior to the writing the result to Cassandra.
I believe that ensures that the metrics are accurate but I believe it
also introduces a lot of complexity and possibly latency into my Spark
streaming application.

Has any one else run into this problem before and how did you solve it?

Thanks, Mike.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Managing spark processes via supervisord

2015-06-05 Thread Mike Trienis
Thanks Ignor,

I managed to find a fairly simple solution. It seems that the shell scripts
(e.g. .start-master.sh, start-slave.sh) end up executing /bin/spark-class
which is always run in the foreground.

Here is a solution I provided on stackoverflow:

   -
   
http://stackoverflow.com/questions/30672648/how-to-autostart-an-apache-spark-cluster-using-supervisord/30676844#30676844


Cheers Mike


On Wed, Jun 3, 2015 at 12:29 PM, Igor Berman igor.ber...@gmail.com wrote:

 assuming you are talking about standalone cluster
 imho, with workers you won't get any problems and it's straightforward
 since they are usually foreground processes
 with master it's a bit more complicated, ./sbin/start-master.sh goes
 background which is not good for supervisor, but anyway I think it's
 doable(going to setup it too in a few days)

 On 3 June 2015 at 21:46, Mike Trienis mike.trie...@orcsol.com wrote:

 Hi All,

 I am curious to know if anyone has successfully deployed a spark cluster
 using supervisord?

- http://supervisord.org/

 Currently I am using the cluster launch scripts which are working
 greater, however, every time I reboot my VM or development environment I
 need to re-launch the cluster.

 I am considering using supervisord to control all the processes (worker,
 master, ect.. ) in order to have the cluster up an running after boot-up;
 although I'd like to understand if it will cause more issues than it
 solves.

 Thanks, Mike.





Managing spark processes via supervisord

2015-06-03 Thread Mike Trienis
Hi All,

I am curious to know if anyone has successfully deployed a spark cluster
using supervisord?

   - http://supervisord.org/

Currently I am using the cluster launch scripts which are working greater,
however, every time I reboot my VM or development environment I need to
re-launch the cluster.

I am considering using supervisord to control all the processes (worker,
master, ect.. ) in order to have the cluster up an running after boot-up;
although I'd like to understand if it will cause more issues than it
solves.

Thanks, Mike.


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-23 Thread Mike Trienis
Yup, and since I have only one core per executor it explains why there was
only one executor utilized. I'll need to investigate which EC2 instance
type is going to be the best fit.

Thanks Evo.

On Fri, May 22, 2015 at 3:47 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 A receiver occupies a cpu core, an executor is simply a jvm instance and
 as such it can be granted any number of cores and ram

 So check how many cores you have per executor


 Sent from Samsung Mobile


  Original message 
 From: Mike Trienis
 Date:2015/05/22 21:51 (GMT+00:00)
 To: user@spark.apache.org
 Subject: Re: Spark Streaming: all tasks running on one executor (Kinesis +
 Mongodb)

 I guess each receiver occupies a executor. So there was only one executor
 available for processing the job.

 On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hi All,

 I have cluster of four nodes (three workers and one master, with one core
 each) which consumes data from Kinesis at 15 second intervals using two
 streams (i.e. receivers). The job simply grabs the latest batch and pushes
 it to MongoDB. I believe that the problem is that all tasks are executed on
 a single worker node and never distributed to the others. This is true even
 after I set the number of concurrentJobs to 3. Overall, I would really like
 to increase throughput (i.e. more than 500 records / second) and understand
 why all executors are not being utilized.

 Here are some parameters I have set:

-
- spark.streaming.blockInterval   200
- spark.locality.wait 500
- spark.streaming.concurrentJobs  3

 This is the code that's actually doing the writing:

 def write(rdd: RDD[Data], time:Time) : Unit = {
 val result = doSomething(rdd, time)
 result.foreachPartition { i =
 i.foreach(record = connection.insert(record))
 }
 }

 def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
 rdd.flatMap(MyObject)
 }

 Any ideas as to how to improve the throughput?

 Thanks, Mike.





Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
Hi All,

I have cluster of four nodes (three workers and one master, with one core
each) which consumes data from Kinesis at 15 second intervals using two
streams (i.e. receivers). The job simply grabs the latest batch and pushes
it to MongoDB. I believe that the problem is that all tasks are executed on
a single worker node and never distributed to the others. This is true even
after I set the number of concurrentJobs to 3. Overall, I would really like
to increase throughput (i.e. more than 500 records / second) and understand
why all executors are not being utilized.

Here are some parameters I have set:

   -
   - spark.streaming.blockInterval   200
   - spark.locality.wait 500
   - spark.streaming.concurrentJobs  3

This is the code that's actually doing the writing:

def write(rdd: RDD[Data], time:Time) : Unit = {
val result = doSomething(rdd, time)
result.foreachPartition { i =
i.foreach(record = connection.insert(record))
}
}

def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
rdd.flatMap(MyObject)
}

Any ideas as to how to improve the throughput?

Thanks, Mike.


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
I guess each receiver occupies a executor. So there was only one executor
available for processing the job.

On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Hi All,

 I have cluster of four nodes (three workers and one master, with one core
 each) which consumes data from Kinesis at 15 second intervals using two
 streams (i.e. receivers). The job simply grabs the latest batch and pushes
 it to MongoDB. I believe that the problem is that all tasks are executed on
 a single worker node and never distributed to the others. This is true even
 after I set the number of concurrentJobs to 3. Overall, I would really like
 to increase throughput (i.e. more than 500 records / second) and understand
 why all executors are not being utilized.

 Here are some parameters I have set:

-
- spark.streaming.blockInterval   200
- spark.locality.wait 500
- spark.streaming.concurrentJobs  3

 This is the code that's actually doing the writing:

 def write(rdd: RDD[Data], time:Time) : Unit = {
 val result = doSomething(rdd, time)
 result.foreachPartition { i =
 i.foreach(record = connection.insert(record))
 }
 }

 def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
 rdd.flatMap(MyObject)
 }

 Any ideas as to how to improve the throughput?

 Thanks, Mike.



Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
Hey Chris!

I was happy to see the documentation outlining that issue :-) However, I
must have got into a pretty terrible state because I had to delete and
recreate the kinesis streams as well as the DynamoDB tables.

Thanks for the reply, everything is sorted.

Mike




On Fri, May 8, 2015 at 7:55 PM, Chris Fregly ch...@fregly.com wrote:

 hey mike-

 as you pointed out here from my docs, changing the stream name is
 sometimes problematic due to the way the Kinesis Client Library manages
 leases and checkpoints, etc in DynamoDB.

 I noticed this directly while developing the Kinesis connector which is
 why I highlighted the issue here.

 is wiping out the DynamoDB table a suitable workaround for now?  usually
 in production, you wouldn't be changing stream names often, so hopefully
 that's ok for dev.

 otherwise, can you share the relevant spark streaming logs that are
 generated when you do this?

 I saw a lot of lease not owned by this Kinesis Client type of errors,
 from what I remember.

 lemme know!

 -Chris

 On May 8, 2015, at 4:36 PM, Mike Trienis mike.trie...@orcsol.com wrote:


- [Kinesis stream name]: The Kinesis stream that this streaming
application receives from
   - The application name used in the streaming context becomes the
   Kinesis application name
   - The application name must be unique for a given account and
   region.
   - The Kinesis backend automatically associates the application name
   to the Kinesis stream using a DynamoDB table (always in the us-east-1
   region) created during Kinesis Client Library initialization.
   - *Changing the application name or stream name can lead to Kinesis
   errors in some cases. If you see errors, you may need to manually delete
   the DynamoDB table.*


 On Fri, May 8, 2015 at 2:06 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hi All,

 I am submitting the assembled fat jar file by the command:

 bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar
 --class com.xxx.Consumer -0.1-SNAPSHOT.jar

 It reads the data file from kinesis using the stream name defined in a
 configuration file. It turns out that it reads the data perfectly fine for
 one stream name (i.e. the default), however, if I switch the stream name
 and re-submit the jar, it no longer reads the data. From CloudWatch, I can
 see that there is data put into the stream and spark is actually sending
 get requests as well. However, it doesn't seem to be outputting the data.

 Has anyone else encountered a similar issue? Does spark cache the stream
 name somewhere? I also have checkpointing enabled as well.

 Thanks, Mike.









Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
   - [Kinesis stream name]: The Kinesis stream that this streaming
   application receives from
  - The application name used in the streaming context becomes the
  Kinesis application name
  - The application name must be unique for a given account and region.
  - The Kinesis backend automatically associates the application name
  to the Kinesis stream using a DynamoDB table (always in the us-east-1
  region) created during Kinesis Client Library initialization.
  - *Changing the application name or stream name can lead to Kinesis
  errors in some cases. If you see errors, you may need to manually delete
  the DynamoDB table.*


On Fri, May 8, 2015 at 2:06 PM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Hi All,

 I am submitting the assembled fat jar file by the command:

 bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar
 --class com.xxx.Consumer -0.1-SNAPSHOT.jar

 It reads the data file from kinesis using the stream name defined in a
 configuration file. It turns out that it reads the data perfectly fine for
 one stream name (i.e. the default), however, if I switch the stream name
 and re-submit the jar, it no longer reads the data. From CloudWatch, I can
 see that there is data put into the stream and spark is actually sending
 get requests as well. However, it doesn't seem to be outputting the data.

 Has anyone else encountered a similar issue? Does spark cache the stream
 name somewhere? I also have checkpointing enabled as well.

 Thanks, Mike.








Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Mike Trienis
Hi All,

I am submitting the assembled fat jar file by the command:

bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class
com.xxx.Consumer -0.1-SNAPSHOT.jar

It reads the data file from kinesis using the stream name defined in a
configuration file. It turns out that it reads the data perfectly fine for
one stream name (i.e. the default), however, if I switch the stream name
and re-submit the jar, it no longer reads the data. From CloudWatch, I can
see that there is data put into the stream and spark is actually sending
get requests as well. However, it doesn't seem to be outputting the data.

Has anyone else encountered a similar issue? Does spark cache the stream
name somewhere? I also have checkpointing enabled as well.

Thanks, Mike.


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-14 Thread Mike Trienis
Hi Vadim,

After removing provided from org.apache.spark %%
spark-streaming-kinesis-asl I ended up with huge number of deduplicate
errors:

https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a

It would be nice if you could share some pieces of your mergeStrategy code
for reference.

Also, after adding provided back to spark-streaming-kinesis-asl and I
submit the spark job with the spark-streaming-kinesis-asl jar file

sh /usr/lib/spark/bin/spark-submit --verbose --jars
lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer
target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

I still end up with the following error...

Exception in thread main java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeFormat
at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)

Has anyone else run into this issue?



On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 I don't believe the Kinesis asl should be provided. I used mergeStrategy
 successfully to produce an uber jar.

 Fyi, I've been having trouble consuming data out of Kinesis with Spark
 with no success :(
 Would be curious to know if you got it working.

 Vadim

 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote:

 Hi All,

 I have having trouble building a fat jar file through sbt-assembly.

 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
 strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/services/java.sql.Driver' with strategy
 'filterDistinctLines'
 [warn] Merging 'rootdoc.txt' with strategy 'concat'
 [warn] Strategy 'concat' was applied to a file
 [warn] Strategy 'discard' was applied to 17 files
 [warn] Strategy 'filterDistinctLines' was applied to a file
 [warn] Strategy 'rename' was applied to 4 files

 When submitting the spark application through the command

 sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
 target/scala-2.10/-snapshot.jar

 I end up the the following error,

 Exception in thread main java.lang.NoClassDefFoundError:
 org/joda/time/format/DateTimeFormat
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)
 at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
 at
 com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
 at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307

Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-14 Thread Mike Trienis
Richard,

You response was very helpful and actually resolved my issue. In case
others run into a similar issue,  I followed the procedure:

   - Upgraded to spark 1.3.0
   - Add all spark related libraries are provided
   - Include spark transitive library dependencies

where my build.sbt file

libraryDependencies ++= {
  Seq(
org.apache.spark %% spark-core % 1.3.0 % provided,
org.apache.spark %% spark-streaming % 1.3.0 % provided,
org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 %
provided,
joda-time % joda-time % 2.2,
org.joda % joda-convert % 1.2,
com.amazonaws % aws-java-sdk % 1.8.3,
com.amazonaws % amazon-kinesis-client % 1.2.0)

and submitting a spark job can done via

sh ./spark-1.3.0-bin-cdh4/bin/spark-submit --jars
spark-streaming-kinesis-asl_2.10-1.3.0.jar --verbose --class
com.xxx.MyClass target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

Thanks again Richard!

Cheers Mike.


On Tue, Apr 14, 2015 at 11:01 AM, Richard Marscher rmarsc...@localytics.com
 wrote:

 Hi,

 I've gotten an application working with sbt-assembly and spark, thought
 I'd present an option. In my experience, trying to bundle any of the Spark
 libraries in your uber jar is going to be a major pain. There will be a lot
 of deduplication to work through and even if you resolve them it can be
 easy to do it incorrectly. I considered it an intractable problem. So the
 alternative is to not include those jars in your uber jar. For this to work
 you will need the same libraries on the classpath of your Spark cluster and
 your driver program (if you are running that as an application and not just
 using spark-submit).

 As for your NoClassDefFoundError, you either are missing Joda Time in your
 runtime classpath or have conflicting versions. It looks like something
 related to AWS wants to use it. Check your uber jar to see if its including
 the org/joda/time as well as the classpath of your spark cluster. For
 example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory
 has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark
 1.2 I found a conflict between httpclient versions that my uber jar pulled
 in for AWS libraries and the one bundled in the spark uber jar. I hand
 patched the spark uber jar to remove the offending httpclient bytecode to
 resolve the issue. You may be facing a similar situation.

 I hope that gives some ideas for resolving your issue.

 Regards,
 Rich

 On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hi Vadim,

 After removing provided from org.apache.spark %%
 spark-streaming-kinesis-asl I ended up with huge number of deduplicate
 errors:

 https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a

 It would be nice if you could share some pieces of your mergeStrategy
 code for reference.

 Also, after adding provided back to spark-streaming-kinesis-asl and I
 submit the spark job with the spark-streaming-kinesis-asl jar file

 sh /usr/lib/spark/bin/spark-submit --verbose --jars
 lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer
 target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar

 I still end up with the following error...

 Exception in thread main java.lang.NoClassDefFoundError:
 org/joda/time/format/DateTimeFormat
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)

 Has anyone else run into this issue?



 On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:

 I don't believe the Kinesis asl should be provided. I used mergeStrategy
 successfully to produce an uber jar.

 Fyi, I've been having trouble consuming data out of Kinesis with Spark
 with no success :(
 Would be curious to know if you got it working.

 Vadim

 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 Hi All,

 I have having trouble building a fat jar file through sbt-assembly.

 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
 strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp

sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Hi All,

I have having trouble building a fat jar file through sbt-assembly.

[warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF/NOTICE' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE' with strategy 'rename'
[warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
strategy 'discard'
[warn] Merging
'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
with strategy 'discard'
[warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
'discard'
[warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties' with
strategy 'discard'
[warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
strategy 'discard'
[warn] Merging 'META-INF/services/java.sql.Driver' with strategy
'filterDistinctLines'
[warn] Merging 'rootdoc.txt' with strategy 'concat'
[warn] Strategy 'concat' was applied to a file
[warn] Strategy 'discard' was applied to 17 files
[warn] Strategy 'filterDistinctLines' was applied to a file
[warn] Strategy 'rename' was applied to 4 files

When submitting the spark application through the command

sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
target/scala-2.10/-snapshot.jar

I end up the the following error,

Exception in thread main java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeFormat
at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
at
com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
at
com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155)
at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20)
at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala)

The snippet from my build.sbt file is:

org.apache.spark %% spark-core % 1.2.0 % provided,
org.apache.spark %% spark-streaming % 1.2.0 % provided,
com.datastax.spark %% spark-cassandra-connector %
1.2.0-alpha1 % provided,
org.apache.spark %% spark-streaming-kinesis-asl % 1.2.0 %
provided,

And the error is originating from:

val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())

Am I correct to set spark-streaming-kinesis-asl as a *provided *dependency?
Also, is there a merge strategy I need apply?

Any help would be appreciated, Mike.


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-13 Thread Mike Trienis
Thanks Vadim, I can certainly consume data from a Kinesis stream when
running locally. I'm currently in the processes of extending my work to a
proper cluster (i.e. using a spark-submit job via uber jar). Feel free to
add me to gmail chat and maybe we can help each other.

On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.com wrote:

 I don't believe the Kinesis asl should be provided. I used mergeStrategy
 successfully to produce an uber jar.

 Fyi, I've been having trouble consuming data out of Kinesis with Spark
 with no success :(
 Would be curious to know if you got it working.

 Vadim

 On Apr 13, 2015, at 9:36 PM, Mike Trienis mike.trie...@orcsol.com wrote:

 Hi All,

 I have having trouble building a fat jar file through sbt-assembly.

 [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
 [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
 [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
 strategy 'discard'
 [warn] Merging
 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
 strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.properties'
 with strategy 'discard'
 [warn] Merging 'META-INF/maven/org.slf4j/slf4j-log4j12/pom.xml' with
 strategy 'discard'
 [warn] Merging 'META-INF/services/java.sql.Driver' with strategy
 'filterDistinctLines'
 [warn] Merging 'rootdoc.txt' with strategy 'concat'
 [warn] Strategy 'concat' was applied to a file
 [warn] Strategy 'discard' was applied to 17 files
 [warn] Strategy 'filterDistinctLines' was applied to a file
 [warn] Strategy 'rename' was applied to 4 files

 When submitting the spark application through the command

 sh /usr/lib/spark/bin/spark-submit -class com.xxx.ExampleClassName
 target/scala-2.10/-snapshot.jar

 I end up the the following error,

 Exception in thread main java.lang.NoClassDefFoundError:
 org/joda/time/format/DateTimeFormat
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)
 at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
 at
 com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
 at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
 at
 com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:202)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:175)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:155)
 at com.quickstatsengine.aws.AwsProvider$.init(AwsProvider.scala:20)
 at com.quickstatsengine.aws.AwsProvider$.clinit(AwsProvider.scala)

 The snippet from my build.sbt file is:

 org.apache.spark %% spark-core % 1.2.0 % provided,
 org.apache.spark %% spark-streaming % 1.2.0 % provided,
 com.datastax.spark %% spark-cassandra

Re: Cannot run unit test.

2015-04-08 Thread Mike Trienis
It's because your tests are running in parallel and you can only have one
context running at a time. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-unit-test-tp14459p22429.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming S3 Performance Implications

2015-04-01 Thread Mike Trienis
Hey Chris,

Apologies for the delayed reply. Your responses are always insightful and
appreciated :-)

However, I have a few more questions.

also, it looks like you're writing to S3 per RDD.  you'll want to broaden
that out to write DStream batches

I assume you mean dstream.saveAsTextFiles() vs
rdd.saveAsTextFile(). Although looking at the source code
DStream.scala, the saveAsTextFiles is simply wrapping a rdd.saveAsTextFile

  def saveAsTextFiles(prefix: String, suffix: String = ) {
val saveFunc = (rdd: RDD[T], time: Time) = {
  val file = rddToFileName(prefix, suffix, time)
  rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc)
  }

So it's not clear to me how it would improve the throughput.

Also, for your comment expand even further and write window batches (where
the window interval is a multiple of the batch interval). I still don't
quite understand mechanics underneath, for example, what would be the
difference between extending the batch interval and adding windowed
batches? I presume it has something to do with the processor thread(s)
within a receiver?

By the way, in the near future, I'll be putting together some performance
numbers on a proper deployment, and will be sure to share my findings.

Thanks, Mike!



On Sat, Mar 21, 2015 at 8:09 AM, Chris Fregly ch...@fregly.com wrote:

 hey mike!

 you'll definitely want to increase your parallelism by adding more shards
 to the stream - as well as spinning up 1 receiver per shard and unioning
 all the shards per the KinesisWordCount example that is included with the
 kinesis streaming package.

 you'll need more cores (cluster) or threads (local) to support this -
 equalling at least the number of shards/receivers + 1.

 also, it looks like you're writing to S3 per RDD.  you'll want to broaden
 that out to write DStream batches - or expand  even further and write
 window batches (where the window interval is a multiple of the batch
 interval).

 this goes for any spark streaming implementation - not just Kinesis.

 lemme know if that works for you.

 thanks!

 -Chris
 _
 From: Mike Trienis mike.trie...@orcsol.com
 Sent: Wednesday, March 18, 2015 2:45 PM
 Subject: Spark Streaming S3 Performance Implications
 To: user@spark.apache.org



  Hi All,

  I am pushing data from Kinesis stream to S3 using Spark Streaming and
 noticed that during testing (i.e. master=local[2]) the batches (1 second
 intervals) were falling behind the incoming data stream at about 5-10
 events / second. It seems that the rdd.saveAsTextFile(s3n://...) is taking
 at a few seconds to complete.

   val saveFunc = (rdd: RDD[String], time: Time) = {

  val count = rdd.count()

  if (count  0) {

  val s3BucketInterval = time.milliseconds.toString

 rdd.saveAsTextFile(s3n://...)

  }
  }

  dataStream.foreachRDD(saveFunc)


  Should I expect the same behaviour in a deployed cluster? Or does the
 rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node?

  Write the elements of the dataset as a text file (or set of text files)
 in a given directory in the local filesystem, HDFS or any other
 Hadoop-supported file system. Spark will call toString on each element to
 convert it to a line of text in the file.

  Thanks, Mike.





Spark Streaming S3 Performance Implications

2015-03-18 Thread Mike Trienis
Hi All,

I am pushing data from Kinesis stream to S3 using Spark Streaming and
noticed that during testing (i.e. master=local[2]) the batches (1 second
intervals) were falling behind the incoming data stream at about 5-10
events / second. It seems that the rdd.saveAsTextFile(s3n://...) is taking
at a few seconds to complete.

val saveFunc = (rdd: RDD[String], time: Time) = {

val count = rdd.count()

if (count  0) {

val s3BucketInterval = time.milliseconds.toString

   rdd.saveAsTextFile(s3n://...)

}
}

dataStream.foreachRDD(saveFunc)


Should I expect the same behaviour in a deployed cluster? Or does the
rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node?

Write the elements of the dataset as a text file (or set of text files) in
a given directory in the local filesystem, HDFS or any other
Hadoop-supported file system. Spark will call toString on each element to
convert it to a line of text in the file.

Thanks, Mike.


Re: Writing to S3 and retrieving folder names

2015-03-05 Thread Mike Trienis
Please ignore my question, you can simply specify the root directory and it
looks like redshift takes care of the rest.

copy mobile
from 's3://BUCKET_NAME/'
credentials 
json 's3://BUCKET_NAME/jsonpaths.json'

On Thu, Mar 5, 2015 at 3:33 PM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Hi All,

 I am receiving data from AWS Kinesis using Spark Streaming and am writing
 the data collected in the dstream to s3 using output function:

 dstreamData.saveAsTextFiles(s3n://XXX:XXX@/)

 After the run the application for several seconds, I end up with a
 sequence of directories in S3 that look like [PREFIX]-1425597204000.

 At the same time I'd like to run a copy command on Redshift that pulls
 over the exported data. The problem is that I am not sure how to extract
 the folder names from the dstream object in order to construct the
 appropriate COPY command.


 https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.dstream.DStream

 Anyone have any ideas?

 Thanks, Mike.



Writing to S3 and retrieving folder names

2015-03-05 Thread Mike Trienis
Hi All,

I am receiving data from AWS Kinesis using Spark Streaming and am writing
the data collected in the dstream to s3 using output function:

dstreamData.saveAsTextFiles(s3n://XXX:XXX@/)

After the run the application for several seconds, I end up with a sequence
of directories in S3 that look like [PREFIX]-1425597204000.

At the same time I'd like to run a copy command on Redshift that pulls over
the exported data. The problem is that I am not sure how to extract the
folder names from the dstream object in order to construct the appropriate
COPY command.

https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.dstream.DStream

Anyone have any ideas?

Thanks, Mike.


Pushing data from AWS Kinesis - Spark Streaming - AWS Redshift

2015-03-01 Thread Mike Trienis
Hi All,

I am looking at integrating a data stream from AWS Kinesis to AWS Redshift
and since I am already ingesting the data through Spark Streaming, it seems
convenient to also push that data to AWS Redshift at the same time.

I have taken a look at the AWS kinesis connector although I am not sure it
was designed to integrate with Apache Spark. It seems more like a
standalone approach:

   - https://github.com/awslabs/amazon-kinesis-connectors

There is also a Spark redshift integration library, however, it looks like
it was intended for pulling data rather than pushing data to AWS Redshift:

   - https://github.com/databricks/spark-redshift

I finally took a look at a general Scala library that integrates with AWS
Redshift:

   - http://scalikejdbc.org/

Does anyone have any experience pushing data from Spark Streaming to AWS
Redshift? Does it make sense conceptually, or does it make more sense to
push data from AWS Kinesis to AWS Redshift VIA another standalone approach
such as the AWS Kinesis connectors.

Thanks, Mike.


Integrating Spark Streaming with Reactive Mongo

2015-02-26 Thread Mike Trienis
Hi All,

I have Spark Streaming setup to write data to a replicated MongoDB database
and would like to understand if there would be any issues using the Reactive
Mongo library to write directly to the mongoDB? My stack is Apache Spark
sitting on top of Cassandra for the datastore, so my thinking is that the
MongoDB connector for Hadoop will not be particular useful for me since I'm
not using HDFS? Is there anything that I'm missing?  

Here is an example of code that I'm planning on using as a starting point
for my implementation. 

LogAggregator
https://github.com/chimpler/blog-spark-streaming-log-aggregation/blob/master/src/main/scala/com/chimpler/sparkstreaminglogaggregation/LogAggregator.scala
  

Thanks, Mike. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrating-Spark-Streaming-with-Reactive-Mongo-tp21828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Integrating Spark Streaming with Reactive Mongo

2015-02-26 Thread Mike Trienis
Hi All,

I have Spark Streaming setup to write data to a replicated MongoDB database
and would like to understand if there would be any issues using the
Reactive Mongo library to write directly to the mongoDB? My stack is Apache
Spark sitting on top of Cassandra for the datastore, so my thinking is that
the MongoDB connector for Hadoop will not be particular useful for me since
I'm not using HDFS? Is there anything that I'm missing?

Here is an example of code that I'm planning on using as a starting point
for my implementation.

LogAggregator
https://github.com/chimpler/blog-spark-streaming-log-aggregation/blob/master/src/main/scala/com/chimpler/sparkstreaminglogaggregation/LogAggregator.scala

Thanks, Mike.


Re: Datastore HDFS vs Cassandra

2015-02-11 Thread Mike Trienis
Thanks everyone for your responses. I'll definitely think carefully about
the data models, querying patterns and fragmentation side-effects.

Cheers, Mike.

On Wed, Feb 11, 2015 at 1:14 AM, Franc Carter franc.car...@rozettatech.com
wrote:


 I forgot to mention that if you do decide to use Cassandra I'd highly
 recommend jumping on the Cassandra mailing list, if we had taken in come of
 the advice on that list things would have been considerably smoother

 cheers

 On Wed, Feb 11, 2015 at 8:12 PM, Christian Betz 
 christian.b...@performance-media.de wrote:

   Hi

  Regarding the Cassandra Data model, there's an excellent post on the
 ebay tech blog:
 http://www.ebaytechblog.com/2012/07/16/cassandra-data-modeling-best-practices-part-1/.
 There's also a slideshare for this somewhere.

  Happy hacking

  Chris

   Von: Franc Carter franc.car...@rozettatech.com
 Datum: Mittwoch, 11. Februar 2015 10:03
 An: Paolo Platter paolo.plat...@agilelab.it
 Cc: Mike Trienis mike.trie...@orcsol.com, user@spark.apache.org 
 user@spark.apache.org
 Betreff: Re: Datastore HDFS vs Cassandra


 One additional comment I would make is that you should be careful with
 Updates in Cassandra, it does support them but large amounts of Updates
 (i.e changing existing keys) tends to cause fragmentation. If you are
 (mostly) adding new keys (e.g new records in the the time series) then
 Cassandra can be excellent

  cheers


 On Wed, Feb 11, 2015 at 6:13 PM, Paolo Platter paolo.plat...@agilelab.it
  wrote:

   Hi Mike,

 I developed a Solution with cassandra and spark, using DSE.
 The main difficult is about cassandra, you need to understand very well
 its data model and its Query patterns.
 Cassandra has better performance than hdfs and it has DR and stronger
 availability.
 Hdfs is a filesystem, cassandra is a dbms.
 Cassandra supports full CRUD without acid.
 Hdfs is more flexible than cassandra.

 In my opinion, if you have a real time series, go with Cassandra paying
 attention at your reporting data access patterns.

 Paolo

 Inviata dal mio Windows Phone
  --
 Da: Mike Trienis mike.trie...@orcsol.com
 Inviato: ?11/?02/?2015 05:59
 A: user@spark.apache.org
 Oggetto: Datastore HDFS vs Cassandra

   Hi,

 I am considering implement Apache Spark on top of Cassandra database
 after
 listing to related talk and reading through the slides from DataStax. It
 seems to fit well with our time-series data and reporting requirements.


 http://www.slideshare.net/patrickmcfadin/apache-cassandra-apache-spark-for-time-series-data

 Does anyone have any experiences using Apache Spark and Cassandra,
 including
 limitations (and or) technical difficulties? How does Cassandra compare
 with
 HDFS and what use cases would make HDFS more suitable?

 Thanks, Mike.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Datastore-HDFS-vs-Cassandra-tp21590.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




  --

 *Franc Carter* | Systems Architect | Rozetta Technology

 franc.car...@rozettatech.com  franc.car...@rozettatech.com|
 www.rozettatechnology.com

 Tel: +61 2 8355 2515

 Level 4, 55 Harrington St, The Rocks NSW 2000

 PO Box H58, Australia Square, Sydney NSW 1215

 AUSTRALIA




 --

 *Franc Carter* | Systems Architect | Rozetta Technology

 franc.car...@rozettatech.com  franc.car...@rozettatech.com|
 www.rozettatechnology.com

 Tel: +61 2 8355 2515

 Level 4, 55 Harrington St, The Rocks NSW 2000

 PO Box H58, Australia Square, Sydney NSW 1215

 AUSTRALIA