Re: Connection pooling in spark jobs

2015-04-03 Thread Charles Feduke
Out of curiosity I wanted to see what JBoss supported in terms of
clustering and database connection pooling since its implementation should
suffice for your use case. I found:

*Note:* JBoss does not recommend using this feature on a production
environment. It requires accessing a connection pool remotely and this is
an anti-pattern as connections are not serializable. Besides, transaction
propagation is not supported and it could lead to connection leaks if the
remote clients are unreliable (i.e crashes, network failure). If you do
need to access a datasource remotely, JBoss recommends accessing it via a
remote session bean facade.[1]

You probably aren't worried about transactions; I gather from your use case
you are just pulling this data in a read only fashion. That being said
JBoss appears to have something.

The other thing to look for is whether or not a solution exists in Hadoop;
I can't find anything for JDBC connection pools over a cluster (just pools
local to a mapper which is similar to what Cody suggested earlier for Spark
and partitions).

If you were talking about a high volume web application then I'd believe
the extra effort for connection pooling [over the cluster] would be worth
it. Unless you're planning on executing several hundred parallel jobs, does
the small amount of overhead outweigh the time necessary to develop a
solution? (I'm guessing a solution doesn't exist because the pattern where
it would be an issue just isn't a common use case for Spark. I went down
this path - connection pooling - myself originally and found a single
connection per executor was fine for my needs. Local connection pools for
the partition as Cody said previously would also work for my use case.)

A local connection pool that was shared amongst all executors on a node
isn't a solution since different jobs execute under different JVMs even
when on the same worker node.[2]

1. https://developer.jboss.org/wiki/ConfigDataSources
2. http://spark.apache.org/docs/latest/cluster-overview.html



On Fri, Apr 3, 2015 at 1:39 AM Sateesh Kavuri sateesh.kav...@gmail.com
wrote:

 Each executor runs for about 5 secs until which time the db connection can
 potentially be open. Each executor will have 1 connection open.
 Connection pooling surely has its advantages of performance and not
 hitting the dbserver for every open/close. The database in question is not
 just used by the spark jobs, but is shared by other systems and so the
 spark jobs have to better at managing the resources.

 I am not really looking for a db connections counter (will let the db
 handle that part), but rather have a pool of connections on spark end so
 that the connections can be reused across jobs


 On Fri, Apr 3, 2015 at 10:21 AM, Charles Feduke charles.fed...@gmail.com
 wrote:

 How long does each executor keep the connection open for? How many
 connections does each executor open?

 Are you certain that connection pooling is a performant and suitable
 solution? Are you running out of resources on the database server and
 cannot tolerate each executor having a single connection?

 If you need a solution that limits the number of open connections
 [resource starvation on the DB server] I think you'd have to fake it with a
 centralized counter of active connections, and logic within each executor
 that blocks when the counter is at a given threshold. If the counter is not
 at threshold, then an active connection can be created (after incrementing
 the shared counter). You could use something like ZooKeeper to store the
 counter value. This would have the overall effect of decreasing performance
 if your required number of connections outstrips the database's resources.

 On Fri, Apr 3, 2015 at 12:22 AM Sateesh Kavuri sateesh.kav...@gmail.com
 wrote:

 But this basically means that the pool is confined to the job (of a
 single app) in question, but is not sharable across multiple apps?
 The setup we have is a job server (the spark-jobserver) that creates
 jobs. Currently, we have each job opening and closing a connection to the
 database. What we would like to achieve is for each of the jobs to obtain a
 connection from a db pool

 Any directions on how this can be achieved?

 --
 Sateesh

 On Thu, Apr 2, 2015 at 7:00 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Connection pools aren't serializable, so you generally need to set them
 up inside of a closure.  Doing that for every item is wasteful, so you
 typically want to use mapPartitions or foreachPartition

 rdd.mapPartition { part =
 setupPool
 part.map { ...



 See Design Patterns for using foreachRDD in
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams

 On Thu, Apr 2, 2015 at 7:52 AM, Sateesh Kavuri 
 sateesh.kav...@gmail.com wrote:

 Right, I am aware on how to use connection pooling with oracle, but
 the specific question is how to use it in the context of spark job 
 execution
 On 2 Apr 2015 17:41, Ted Yu yuzhih...@gmail.com wrote

Re: Spark Streaming Worker runs out of inodes

2015-04-03 Thread Charles Feduke
You could also try setting your `nofile` value in /etc/security/limits.conf
for `soft` to some ridiculously high value if you haven't done so already.

On Fri, Apr 3, 2015 at 2:09 AM Akhil Das ak...@sigmoidanalytics.com wrote:

 Did you try these?

 - Disable shuffle : spark.shuffle.spill=false
 - Enable log rotation:

 sparkConf.set(spark.executor.logs.rolling.strategy, size)
 .set(spark.executor.logs.rolling.size.maxBytes, 1024)
 .set(spark.executor.logs.rolling.maxRetainedFiles, 3)


 Thanks
 Best Regards

 On Fri, Apr 3, 2015 at 9:09 AM, a mesar amesa...@gmail.com wrote:

 Yes, with spark.cleaner.ttl set there is no cleanup.  We pass 
 --properties-file
 spark-dev.conf to spark-submit where  spark-dev.conf contains:

 spark.master spark://10.250.241.66:7077
 spark.logConf true
 spark.cleaner.ttl 1800
 spark.executor.memory 10709m
 spark.cores.max 4
 spark.shuffle.consolidateFiles true

 On Thu, Apr 2, 2015 at 7:12 PM, Tathagata Das t...@databricks.com
 wrote:

 Are you saying that even with the spark.cleaner.ttl set your files are
 not getting cleaned up?

 TD

 On Thu, Apr 2, 2015 at 8:23 AM, andrem amesa...@gmail.com wrote:

 Apparently Spark Streaming 1.3.0 is not cleaning up its internal files
 and
 the worker nodes eventually run out of inodes.
 We see tons of old shuffle_*.data and *.index files that are never
 deleted.
 How do we get Spark to remove these files?

 We have a simple standalone app with one RabbitMQ receiver and a two
 node
 cluster (2 x r3large AWS instances).
 Batch interval is 10 minutes after which we process data and write
 results
 to DB. No windowing or state mgmt is used.

 I've poured over the documentation and tried setting the following
 properties but they have not helped.
 As a work around we're using a cron script that periodically cleans up
 old
 files but this has a bad smell to it.

 SPARK_WORKER_OPTS in spark-env.sh on every worker node
   spark.worker.cleanup.enabled true
   spark.worker.cleanup.interval
   spark.worker.cleanup.appDataTtl

 Also tried on the driver side:
   spark.cleaner.ttl
   spark.shuffle.consolidateFiles true



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Spark-Streaming-Worker-runs-out-of-
 inodes-tp22355.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Which OS for Spark cluster nodes?

2015-04-03 Thread Charles Feduke
As Akhil says Ubuntu is a good choice if you're starting from near scratch.

Cloudera CDH virtual machine images[1] include Hadoop, HDFS, Spark, and
other big data tools so you can get a cluster running with very little
effort. Keep in mind Cloudera is a for-profit corporation so they are also
selling a product.

Personally I prefer the EC2 scripts[2] that ship with the downloadable
Spark distribution. It provisions a cluster for you on AWS and you can
easily terminate the cluster when you don't need it. Ganglia (monitoring),
HDFS (ephemeral and EBS backed), Tachyon (caching), and Spark are all
installed automatically. For learning, using a cluster of 4 medium machines
is fairly inexpensive. (I use the EC2 scripts for both an integration and
production environment.)

1.
http://www.cloudera.com/content/cloudera/en/products-and-services/cdh.html
2. https://spark.apache.org/docs/latest/ec2-scripts.html

On Fri, Apr 3, 2015 at 7:38 AM Akhil Das ak...@sigmoidanalytics.com wrote:

 There isn't any specific Linux distro, but i would prefer Ubuntu for a
 beginner as its very easy to apt-get install stuffs on it.

 Thanks
 Best Regards

 On Fri, Apr 3, 2015 at 4:58 PM, Horsmann, Tobias 
 tobias.horsm...@uni-due.de wrote:

  Hi,
 Are there any recommendations for operating systems that one should use
 for setting up Spark/Hadoop nodes in general?
 I am not familiar with the differences between the various linux
 distributions or how well they are (not) suited for cluster set-ups, so I
 wondered if there is some preferred choices?

  Regards,





Re: Connection pooling in spark jobs

2015-04-02 Thread Charles Feduke
How long does each executor keep the connection open for? How many
connections does each executor open?

Are you certain that connection pooling is a performant and suitable
solution? Are you running out of resources on the database server and
cannot tolerate each executor having a single connection?

If you need a solution that limits the number of open connections [resource
starvation on the DB server] I think you'd have to fake it with a
centralized counter of active connections, and logic within each executor
that blocks when the counter is at a given threshold. If the counter is not
at threshold, then an active connection can be created (after incrementing
the shared counter). You could use something like ZooKeeper to store the
counter value. This would have the overall effect of decreasing performance
if your required number of connections outstrips the database's resources.

On Fri, Apr 3, 2015 at 12:22 AM Sateesh Kavuri sateesh.kav...@gmail.com
wrote:

 But this basically means that the pool is confined to the job (of a single
 app) in question, but is not sharable across multiple apps?
 The setup we have is a job server (the spark-jobserver) that creates jobs.
 Currently, we have each job opening and closing a connection to the
 database. What we would like to achieve is for each of the jobs to obtain a
 connection from a db pool

 Any directions on how this can be achieved?

 --
 Sateesh

 On Thu, Apr 2, 2015 at 7:00 PM, Cody Koeninger c...@koeninger.org wrote:

 Connection pools aren't serializable, so you generally need to set them
 up inside of a closure.  Doing that for every item is wasteful, so you
 typically want to use mapPartitions or foreachPartition

 rdd.mapPartition { part =
 setupPool
 part.map { ...



 See Design Patterns for using foreachRDD in
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams

 On Thu, Apr 2, 2015 at 7:52 AM, Sateesh Kavuri sateesh.kav...@gmail.com
 wrote:

 Right, I am aware on how to use connection pooling with oracle, but the
 specific question is how to use it in the context of spark job execution
 On 2 Apr 2015 17:41, Ted Yu yuzhih...@gmail.com wrote:

 http://docs.oracle.com/cd/B10500_01/java.920/a96654/connpoca.htm

 The question doesn't seem to be Spark specific, btw




  On Apr 2, 2015, at 4:45 AM, Sateesh Kavuri sateesh.kav...@gmail.com
 wrote:
 
  Hi,
 
  We have a case that we will have to run concurrent jobs (for the same
 algorithm) on different data sets. And these jobs can run in parallel and
 each one of them would be fetching the data from the database.
  We would like to optimize the database connections by making use of
 connection pooling. Any suggestions / best known ways on how to achieve
 this. The database in question is Oracle
 
  Thanks,
  Sateesh






Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Charles Feduke
Assuming you are on Linux, what is your /etc/security/limits.conf set for
nofile/soft (number of open file handles)?

On Fri, Mar 20, 2015 at 3:29 PM Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 I try to run a simple sort by on 1.2.1. And it always give me below two
 errors:



 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
 /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826
 (Too many open files)



 And then I switch to:

 conf.set(spark.shuffle.consolidateFiles, true)

 .set(spark.shuffle.manager, SORT)



 Then I get the error:



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent
 failure: Lost task 5.3 in stage 1.0 (TID 36,
 ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException:
 java.io.IOException: File too large

 at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)



 I roughly know the first issue is because Spark shuffle creates too many
 local temp files (and I don’t know the solution, because looks like my
 solution also cause other issues), but I am not sure what means is the
 second error.



 Anyone knows the solution for both cases?



 Regards,



 Shuai



Re: Writing Spark Streaming Programs

2015-03-19 Thread Charles Feduke
Scala is the language used to write Spark so there's never a situation in
which features introduced in a newer version of Spark cannot be taken
advantage of if you write your code in Scala. (This is mostly true of Java,
but it may be a little more legwork if a Java-friendly adapter isn't
available alongside new features.)

Scala is also OO; its a functional hybrid OO language.

Although much of my organization's codebase is written in Java and we've
recently transitioned to Java 8 I still write all of my Spark code using
Scala. (I also squeeze in Scala where I can in other parts of the
organization.) Additionally I use both Python and R for local data
analysis, though I haven't used Python with Spark in production.

On Thu, Mar 19, 2015 at 10:51 AM James King jakwebin...@gmail.com wrote:

 Hello All,

 I'm using Spark for streaming but I'm unclear one which implementation
 language to use Java, Scala or Python.

 I don't know anything about Python, familiar with Scala and have been
 doing Java for a long time.

 I think the above shouldn't influence my decision on which language to use
 because I believe the tool should, fit the problem.

 In terms of performance Java and Scala are comparable. However Java is OO
 and Scala is FP, no idea what Python is.

 If using Scala and not applying a consistent style of programming Scala
 code can become unreadable, but I do like the fact it seems to be possible
 to do so much work with so much less code, that's a strong selling point
 for me. Also it could be that the type of programming done in Spark is best
 implemented in Scala as FP language, not sure though.

 The question I would like your good help with is are there any other
 considerations I need to think about when deciding this? are there any
 recommendations you can make in regards to this?

 Regards
 jk









Re: Spark History server default conf values

2015-03-10 Thread Charles Feduke
What I found from a quick search of the Spark source code (from my local
snapshot on January 25, 2015):

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS =
conf.getInt(spark.history.fs.updateInterval,
conf.getInt(spark.history.updateInterval, 10)) * 1000

  private val retainedApplications =
conf.getInt(spark.history.retainedApplications, 50)


On Tue, Mar 10, 2015 at 12:37 AM Srini Karri skarri@gmail.com wrote:

 Hi All,

 What are the default values for the following conf properities if we don't
 set in the conf file?

 # spark.history.fs.updateInterval 10

 # spark.history.retainedApplications 500


 Regards,

 Srini.



Re: Spark on EC2

2015-02-24 Thread Charles Feduke
This should help you understand the cost of running a Spark cluster for a
short period of time:

http://www.ec2instances.info/

If you run an instance for even 1 second of a single hour you are charged
for that complete hour. So before you shut down your miniature cluster make
sure you really are done with what you want to do, as firing up the cluster
again will be like using an extra hour's worth of time.

The purpose of EC2's free tier is to get you to purchase into AWS services.
At the free level its not terribly useful except for the most simplest of
web applications (which you could host on Heroku - also uses AWS - for
free) or simple long running but largely dormant shell processes.

On Tue Feb 24 2015 at 10:16:56 AM Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Thank You Sean.
 I was just trying to experiment with the performance of Spark Applications
 with various worker instances (I hope you remember that we discussed about
 the worker instances).
 I thought it would be a good one to try in EC2. So, it doesn't work out,
 does it?

 Thank You

 On Tue, Feb 24, 2015 at 8:40 PM, Sean Owen so...@cloudera.com wrote:

 The free tier includes 750 hours of t2.micro instance time per month.
 http://aws.amazon.com/free/

 That's basically a month of hours, so it's all free if you run one
 instance only at a time. If you run 4, you'll be able to run your
 cluster of 4 for about a week free.

 A t2.micro has 1GB of memory, which is small but something you could
 possible get work done with.

 However it provides only burst CPU. You can only use about 10% of 1
 vCPU continuously due to capping. Imagine this as about 1/10th of 1
 core on your laptop. It would be incredibly slow.

 This is not to mention the network and I/O bottleneck you're likely to
 run into as you don't get much provisioning with these free instances.

 So, no you really can't use this for anything that is at all CPU
 intensive. It's for, say, running a low-traffic web service.

 On Tue, Feb 24, 2015 at 2:55 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  Hi,
  I have just signed up for Amazon AWS because I learnt that it provides
  service for free for the first 12 months.
  I want to run Spark on EC2 cluster. Will they charge me for this?
 
  Thank You





Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-17 Thread Charles Feduke
Emre,

As you are keeping the properties file external to the JAR you need to make
sure to submit the properties file as an additional --files (or whatever
the necessary CLI switch is) so all the executors get a copy of the file
along with the JAR.

If you know you are going to just put the properties file on HDFS then why
don't you define a custom system setting like properties.url and pass it
along:

(this is for Spark shell, the only CLI string I have available at the
moment:)

spark-shell --jars $JAR_NAME \
--conf 'properties.url=hdfs://config/stuff.properties' \
--conf
'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties'

... then load the properties file during initialization by examining the
properties.url system setting.

I'd still strongly recommend Typesafe Config as it makes this a lot less
painful, and I know for certain you can place your *.conf at a URL (using
the -Dconfig.url=) though it probably won't work with an HDFS URL.


On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com wrote:

 +1 for TypeSafe config
 Our practice is to include all spark properties under a 'spark' entry in
 the config file alongside job-specific configuration:

 A config file would look like:
 spark {
  master = 
  cleaner.ttl = 123456
  ...
 }
 job {
 context {
 src = foo
 action = barAction
 }
 prop1 = val1
 }

 Then, to create our Spark context, we transparently pass the spark section
 to a SparkConf instance.
 This idiom will instantiate the context with the spark specific
 configuration:


 sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark)))

 And we can make use of the config object everywhere else.

 We use the override model of the typesafe config: reasonable defaults go
 in the reference.conf (within the jar). Environment-specific overrides go
 in the application.conf (alongside the job jar) and hacks are passed with
 -Dprop=value :-)


 -kr, Gerard.


 On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 I've decided to try

   spark-submit ... --conf
 spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

 But when I try to retrieve the value of propertiesFile via

System.err.println(propertiesFile :  +
 System.getProperty(propertiesFile));

 I get NULL:

propertiesFile : null

 Interestingly, when I run spark-submit with --verbose, I see that it
 prints:

   spark.driver.extraJavaOptions -
 -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

 I couldn't understand why I couldn't get to the value of propertiesFile
 by using standard System.getProperty method. (I can use new
 SparkConf().get(spark.driver.extraJavaOptions)  and manually parse it,
 and retrieve the value, but I'd like to know why I cannot retrieve that
 value using System.getProperty method).

 Any ideas?

 If I can achieve what I've described above properly, I plan to pass a
 properties file that resides on HDFS, so that it will be available to my
 driver program wherever that program runs.

 --
 Emre




 On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com
  wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

 Since the properties file is packaged up with the JAR I don't have to
 worry about sending the file separately to all of the slave nodes. Typesafe
 Config is written in Java so it will work if you're not using Scala. (The
 Typesafe Config also has the advantage of being extremely easy to integrate
 with code that is using Java Properties today.)

 If you instead want to send the file separately from the JAR and you use
 the Typesafe Config library, you can specify config.file instead of
 .resource; though I'd point you to [3] below if you want to make your
 development life easier.

 1. https://github.com/typesafehub/config
 2. https://github.com/ceedubs/ficus
 3.
 http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



 On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I
 have non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I

Re: Spark newbie desires feedback on first program

2015-02-16 Thread Charles Feduke
I cannot comment about the correctness of Python code. I will assume your
caper_kv is keyed on something that uniquely identifies all the rows that
make up the person's record so your group by key makes sense, as does the
map. (I will also assume all of the rows that comprise a single person's
record will always fit in memory. If not you will need another approach.)

You should be able to get away with removing the localhost:9000 from your
HDFS URL, i.e., hdfs:///sma/processJSON/people and let your HDFS
configuration for Spark supply the missing pieces.

On Mon Feb 16 2015 at 3:38:31 PM Eric Bell e...@ericjbell.com wrote:

 I'm a spark newbie working on his first attempt to do write an ETL
 program. I could use some feedback to make sure I'm on the right path.
 I've written a basic proof of concept that runs without errors and seems
 to work, although I might be missing some issues when this is actually
 run on more than a single node.

 I am working with data about people (actually healthcare patients). I
 have an RDD that contains multiple rows per person. My overall goal is
 to create a single Person object for each person in my data. In this
 example, I am serializing to JSON, mostly because this is what I know
 how to do at the moment.

 Other than general feedback, is my use of the groupByKey() and
 mapValues() methods appropriate?

 Thanks!


 import json

 class Person:
  def __init__(self):
  self.mydata={}
  self.cpts = []
  self.mydata['cpt']=self.cpts
  def addRowData(self, dataRow):
  # Get the CPT codes
  cpt = dataRow.CPT_1
  if cpt:
  self.cpts.append(cpt)
  def serializeToJSON(self):
  return json.dumps(self.mydata)

 def makeAPerson(rows):
  person = Person()
  for row in rows:
  person.addRowData(row)
  return person.serializeToJSON()

 peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows:
 makeAPerson(personDataRows))
 peopleRDD.saveAsTextFile(hdfs://localhost:9000/sma/processJSON/people)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark newbie desires feedback on first program

2015-02-16 Thread Charles Feduke
My first problem was somewhat similar to yours. You won't find a whole lot
of JDBC to Spark examples since I think a lot of the adoption for Spark is
from teams already experienced with Hadoop and already have an established
big data solution (so their data is already extracted from whatever
sources, e.g., log files, Hive, other M/R jobs). JDBC support is
somewhat... lacking.

Our application uses a 12 node PostgreSQL distributed RDBMS that is sharded
at the application tier. I had to write my own JDBC RDD to support this
logical schema. However because you are coming from a single MySQL DB you
should be able to get away with using the JdbcRDD[1]... but I cannot find a
reference to it for the Python API so someone familiar with using Python
and Spark will have to chime in on that.

You need to consider _how_ the data gets from MySQL to the workers. It
might work to pull all of the data to a single node and then parallelize
that data across the cluster but its not going to be as efficient as range
querying from each worker in the cluster to the database. If you're working
with TBs of data then you will see very big benefits by distributing the
data across workers from the get go; if you don't it will take however long
it takes to copy all the data to a single worker and distribute as your
startup code for each execution. (By range querying what I mean is
basically what the JdbcRDD does - it forces you to include a conditional
statement like id  ? AND id = ? in your SQL which it formats at each
worker so each worker only gets a piece of the pie). The JdbcRDD makes
assumptions about numeric keys for range querying.

The next thing to consider is if you're going against your production
database, will massive reads cause degradation for production users? I am
using read replicas to mitigate this for our production installation, as
copying TBs of data out of PostgreSQL would have some negative effect on
our users. Running your jobs during low traffic is obviously an option
here, as is restoring a read-only version from backup and explicitly
querying that instance (in which case parallelizing user IDs and querying
MySQL directly might get you near to the JdbcRDD behavior). And of course
if the MySQL instance is already your analytics solution then query on.

1.
https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/rdd/JdbcRDD.html

On Mon Feb 16 2015 at 4:42:30 PM Eric Bell e...@ericjbell.com wrote:

 Thanks Charles. I just realized a few minutes ago that I neglected to
 show the step where I generated the key on the person ID. Thanks for the
 pointer on the HDFS URL.

 Next step is to process data from multiple RDDS. My data originates from
 7 tables in a MySQL database. I used sqoop to create avro files from
 these tables, and in turn created RDDs using SparkSQL from the avro
 files. Since the groupByKey only operates on a single RDD, I'm not quite
 sure yet how I'm going to process 7 tables as a transformation to get
 all the data I need into my objects.

 I'm vascillating on whether I should be doing it this way, or if it
 would be a lot simpler to query MySQL to get all the Person IDs,
 parallelize them, and have my Person class make queries directly to the
 MySQL database. Since in theory I only have to do this once, I'm not
 sure there's much to be gained in moving the data from MySQL to Spark
 first.

 I have yet to find any non-trivial examples of ETL logic on the web ...
 it seems like it's mostly word count map-reduce replacements.

 On 02/16/2015 01:32 PM, Charles Feduke wrote:
  I cannot comment about the correctness of Python code. I will assume
  your caper_kv is keyed on something that uniquely identifies all the
  rows that make up the person's record so your group by key makes
  sense, as does the map. (I will also assume all of the rows that
  comprise a single person's record will always fit in memory. If not
  you will need another approach.)
 
  You should be able to get away with removing the localhost:9000 from
  your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your
  HDFS configuration for Spark supply the missing pieces.
 




Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Charles Feduke
I haven't actually tried mixing non-Spark settings into the Spark
properties. Instead I package my properties into the jar and use the
Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
specific) to get at my properties:

Properties file: src/main/resources/integration.conf

(below $ENV might be set to either integration or prod[3])

ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
--conf 'config.resource=$ENV.conf' \
--conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

Since the properties file is packaged up with the JAR I don't have to worry
about sending the file separately to all of the slave nodes. Typesafe
Config is written in Java so it will work if you're not using Scala. (The
Typesafe Config also has the advantage of being extremely easy to integrate
with code that is using Java Properties today.)

If you instead want to send the file separately from the JAR and you use
the Typesafe Config library, you can specify config.file instead of
.resource; though I'd point you to [3] below if you want to make your
development life easier.

1. https://github.com/typesafehub/config
2. https://github.com/ceedubs/ficus
3.
http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I have
 non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I thought I could read the value of my non-Spark property, namely,
 job.output.dir by using:

 SparkConf sparkConf = new SparkConf();
 final String validatedJSONoutputDir = sparkConf.get(job.output.dir);

 But it gives me an exception:

 Exception in thread main java.util.NoSuchElementException:
 job.output.dir

 Is it not possible to mix Spark and non-Spark properties in a single
 .properties file, then pass it via --properties-file and then get the
 values of those non-Spark properties via SparkConf?

 Or is there another object / method to retrieve the values for those
 non-Spark properties?


 --
 Emre Sevinç



Re: SPARK_LOCAL_DIRS Issue

2015-02-11 Thread Charles Feduke
A central location, such as NFS?

If they are temporary for the purpose of further job processing you'll want
to keep them local to the node in the cluster, i.e., in /tmp. If they are
centralized you won't be able to take advantage of data locality and the
central file store will become a bottleneck for further processing.

If /tmp isn't an option because you want to be able to monitor the file
outputs as they occur you can also use HDFS (assuming your Spark nodes are
also HDFS members they will benefit from data locality).

It looks like the problem you are seeing is that a lock cannot be acquired
on the output file in the central file system.

On Wed Feb 11 2015 at 11:55:55 AM TJ Klein tjkl...@gmail.com wrote:

 Hi,

 Using Spark 1.2 I ran into issued setting SPARK_LOCAL_DIRS to a different
 path then local directory.

 On our cluster we have a folder for temporary files (in a central file
 system), which is called /scratch.

 When setting SPARK_LOCAL_DIRS=/scratch/node name

 I get:

  An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0
 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 0.0
 (TID 3, XXX): java.io.IOException: Function not implemented
 at sun.nio.ch.FileDispatcherImpl.lock0(Native Method)
 at sun.nio.ch.FileDispatcherImpl.lock(FileDispatcherImpl.java:91)
 at sun.nio.ch.FileChannelImpl.lock(FileChannelImpl.java:1022)
 at java.nio.channels.FileChannel.lock(FileChannel.java:1052)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:379)

 Using SPARK_LOCAL_DIRS=/tmp, however, works perfectly. Any idea?

 Best,
  Tassilo





 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/SPARK-LOCAL-DIRS-Issue-tp21602.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SPARK_LOCAL_DIRS Issue

2015-02-11 Thread Charles Feduke
Take a look at this:

http://wiki.lustre.org/index.php/Running_Hadoop_with_Lustre

Particularly: http://wiki.lustre.org/images/1/1b/Hadoop_wp_v0.4.2.pdf
(linked from that article)

to get a better idea of what your options are.

If its possible to avoid writing to [any] disk I'd recommend that route,
since that's the performance advantage Spark has over vanilla Hadoop.

On Wed Feb 11 2015 at 2:10:36 PM Tassilo Klein tjkl...@gmail.com wrote:

 Thanks for the info. The file system in use is a Lustre file system.

 Best,
  Tassilo

 On Wed, Feb 11, 2015 at 12:15 PM, Charles Feduke charles.fed...@gmail.com
  wrote:

 A central location, such as NFS?

 If they are temporary for the purpose of further job processing you'll
 want to keep them local to the node in the cluster, i.e., in /tmp. If they
 are centralized you won't be able to take advantage of data locality and
 the central file store will become a bottleneck for further processing.

 If /tmp isn't an option because you want to be able to monitor the file
 outputs as they occur you can also use HDFS (assuming your Spark nodes are
 also HDFS members they will benefit from data locality).

 It looks like the problem you are seeing is that a lock cannot be
 acquired on the output file in the central file system.

 On Wed Feb 11 2015 at 11:55:55 AM TJ Klein tjkl...@gmail.com wrote:

 Hi,

 Using Spark 1.2 I ran into issued setting SPARK_LOCAL_DIRS to a different
 path then local directory.

 On our cluster we have a folder for temporary files (in a central file
 system), which is called /scratch.

 When setting SPARK_LOCAL_DIRS=/scratch/node name

 I get:

  An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 0
 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 0.0
 (TID 3, XXX): java.io.IOException: Function not implemented
 at sun.nio.ch.FileDispatcherImpl.lock0(Native Method)
 at sun.nio.ch.FileDispatcherImpl.lock(FileDispatcherImpl.java:
 91)
 at sun.nio.ch.FileChannelImpl.lock(FileChannelImpl.java:1022)
 at java.nio.channels.FileChannel.lock(FileChannel.java:1052)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:379)

 Using SPARK_LOCAL_DIRS=/tmp, however, works perfectly. Any idea?

 Best,
  Tassilo





 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/SPARK-LOCAL-DIRS-Issue-tp21602.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Parsing CSV files in Spark

2015-02-06 Thread Charles Feduke
I've been doing a bunch of work with CSVs in Spark, mostly saving them as a
merged CSV (instead of the various part-n files). You might find the
following links useful:

- This article is about combining the part files and outputting a header as
the first line in the merged results:

http://java.dzone.com/articles/spark-write-csv-file-header

- This was my take on the previous author's original article, but it
doesn't yet handle the header row:

http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/

spark-csv helps with reading CSV data and mapping a schema for Spark SQL,
but as of now doesn't save CSV data.

On Fri Feb 06 2015 at 9:49:06 AM Sean Owen so...@cloudera.com wrote:

 You can do this manually without much trouble: get your files on a
 distributed store like HDFS, read them with textFile, filter out
 headers, parse with a CSV library like Commons CSV, select columns,
 format and store the result. That's tens of lines of code.

 However you probably want to start by looking at
 https://github.com/databricks/spark-csv which may make it even easier
 than that and give you a richer query syntax.

 On Fri, Feb 6, 2015 at 8:37 AM, Spico Florin spicoflo...@gmail.com
 wrote:
  Hi!
I'm new to Spark. I have a case study that where the data is store in
 CSV
  files. These files have headers with morte than 1000 columns. I would
 like
  to know what are the best practice to parsing them and in special the
  following points:
  1. Getting and parsing all the files from a folder
  2. What CSV parser do you use?
  3. I would like to select just some columns whose names matches a pattern
  and then pass the selected columns values (plus the column names) to the
  processing and save the output to a CSV (preserving the selected
 columns).
 
  If you have any experience with some points above, it will be really
 helpful
  (for me and for the others that will encounter the same cases) if you can
  share your thoughts.
  Thanks.
Regards,
   Florin
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark streaming from kafka real time + batch processing in java

2015-02-06 Thread Charles Feduke
Good questions, some of which I'd like to know the answer to.

  Is it okay to update a NoSQL DB with aggregated counts per batch
interval or is it generally stored in hdfs?

This depends on how you are going to use the aggregate data.

1. Is there a lot of data? If so, and you are going to use the data as
inputs to another job, it might benefit from being distributed across the
cluster on HDFS (for data locality).
2. Usually when speaking about aggregates there is be substantially less
data, in which case storing that data in another datastore is okay. If
you're talking about a few thousand rows, and having them in something like
Mongo or Postgres makes your life easier (reporting software, for example)
- even if you use them as inputs to another job - its okay to just store
the results in another data store. If the data will grow unbounded over
time this might not be a good solution (in which case refer to #1).



On Fri Feb 06 2015 at 6:16:39 AM Mohit Durgapal durgapalmo...@gmail.com
wrote:

 I want to write a spark streaming consumer for kafka in java. I want to
 process the data in real-time as well as store the data in hdfs in
 year/month/day/hour/ format. I am not sure how to achieve this. Should I
 write separate kafka consumers, one for writing data to HDFS and one for
 spark streaming?

 Also I would like to ask what do people generally do with the result of
 spark streams after aggregating over it? Is it okay to update a NoSQL DB
 with aggregated counts per batch interval or is it generally stored in hdfs?

 Is it possible to store the mini batch data from spark streaming to HDFS
 in a way that the data is aggregated  hourly and put into HDFS in its
 hour folder. I would not want a lot of small files equal to the mini
 batches of spark per hour, that would be inefficient for running hadoop
 jobs later.

 Is anyone working on the same problem?

 Any help and comments would be great.


 Regards

 Mohit



Re: How do I set spark.local.dirs?

2015-02-06 Thread Charles Feduke
Did you restart the slaves so they would read the settings? You don't need
to start/stop the EC2 cluster, just the slaves. From the master node:

$SPARK_HOME/sbin/stop-slaves.sh
$SPARK_HOME/sbin/start-slaves.sh

($SPARK_HOME is probably /root/spark)

On Fri Feb 06 2015 at 10:31:18 AM Joe Wass jw...@crossref.org wrote:

 I'm running on EC2 and I want to set the directory to use on the slaves
 (mounted EBS volumes).

 I have set:
 spark.local.dir /vol3/my-spark-dir
 in
/root/spark/conf/spark-defaults.conf

 and replicated to all nodes. I have verified that in the console the value
 in the config corresponds. I have checked that these values are present in
 nodes.

 But it's still creating temp files in the wrong (default) place:

 /mnt2/spark

 How do I get my slaves to pick up this value? How can I verify that they
 have?

 Thanks!

 Joe



Re: spark on ec2

2015-02-05 Thread Charles Feduke
I don't see anything that says you must explicitly restart them to load the
new settings, but usually there is some sort of signal trapped [or brute
force full restart] to get a configuration reload for most daemons. I'd
take a guess and use the $SPARK_HOME/sbin/{stop,start}-slaves.sh scripts on
your master node and see. (
http://spark.apache.org/docs/1.2.0/spark-standalone.html#cluster-launch-scripts
)

I just tested this out on my integration EC2 cluster and got odd results
for stopping the workers (no workers found) but the start script... seemed
to work. My integration cluster was running and functioning after executing
both scripts, but I also didn't make any changes to spark-env either.

On Thu Feb 05 2015 at 9:49:49 PM Kane Kim kane.ist...@gmail.com wrote:

 Hi,

 I'm trying to change setting as described here:
 http://spark.apache.org/docs/1.2.0/ec2-scripts.html
 export SPARK_WORKER_CORES=6

 Then I ran  ~/spark-ec2/copy-dir /root/spark/conf to distribute to
 slaves, but without any effect. Do I have to restart workers?
 How to do that with spark-ec2?

 Thanks.



Re: How to design a long live spark application

2015-02-05 Thread Charles Feduke
If you want to design something like Spark shell have a look at:

http://zeppelin-project.org/

Its open source and may already do what you need. If not, its source code
will be helpful in answering the questions about how to integrate with long
running jobs that you have.

On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com wrote:

 You can check out https://github.com/spark-jobserver/spark-jobserver -
 this allows several users to upload their jars and run jobs with a REST
 interface.

 However, if all users are using the same functionality, you can write a
 simple spray server which will act as the driver and hosts the spark
 context+RDDs, launched in client mode.

 On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com
 wrote:

 Hi All,



 I want to develop a server side application:



 User submit request à Server run spark application and return (this
 might take a few seconds).



 So I want to host the server to keep the long-live context, I don’t know
 whether this is reasonable or not.



 Basically I try to have a global JavaSparkContext instance and keep it
 there, and initialize some RDD. Then my java application will use it to
 submit the job.



 So now I have some questions:



 1, if I don’t close it, will there any timeout I need to configure on the
 spark server?

 2, In theory I want to design something similar to Spark shell (which
 also host a default sc there), just it is not shell based.



 Any suggestion? I think my request is very common for application
 development, here must someone has done it before?



 Regards,



 Shawn





Re: Writing RDD to a csv file

2015-02-03 Thread Charles Feduke
In case anyone needs to merge all of their part-n files (small result
set only) into a single *.csv file or needs to generically flatten case
classes, tuples, etc., into comma separated values:

http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/

On Tue Feb 03 2015 at 8:23:59 AM kundan kumar iitr.kun...@gmail.com wrote:

 Thanks Gerard !!

 This is working.

 On Tue, Feb 3, 2015 at 6:44 PM, Gerard Maas gerard.m...@gmail.com wrote:

 this is more of a scala question, so probably next time you'd like to
 address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala

 val optArrStr:Option[Array[String]] = ???
 optArrStr.map(arr = arr.mkString(,)).getOrElse()  // empty string or
 whatever default value you have for this.

 kr, Gerard.

 On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com
 wrote:

 I have a RDD which is of type

 org.apache.spark.rdd.RDD[(String, (Array[String],
 Option[Array[String]]))]

 I want to write it as a csv file.

 Please suggest how this can be done.

 myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , +
 line._2._2.mkString(','))).saveAsTextFile(hdfs://...)

 Doing mkString on line._2._1 works but does not work for the Option type.

 Please suggest how this can be done.


 Thanks
 Kundan







Re: groupByKey is not working

2015-01-30 Thread Charles Feduke
You'll still need to:

import org.apache.spark.SparkContext._

Importing org.apache.spark._ does _not_ recurse into sub-objects or
sub-packages, it only brings in whatever is at the level of the package or
object imported.

SparkContext._ has some implicits, one of them for adding groupByKey to an
RDD[_] IIRC.

On Fri Jan 30 2015 at 3:48:22 PM Stephen Boesch java...@gmail.com wrote:

 Amit - IJ will not find it until you add the import as Sean mentioned.  It
 includes implicits that intellij will not know about otherwise.

 2015-01-30 12:44 GMT-08:00 Amit Behera amit.bd...@gmail.com:

 I am sorry Sean.

 I am developing code in intelliJ Idea. so with the above dependencies I
 am not able to find *groupByKey* when I am searching by ctrl+space


 On Sat, Jan 31, 2015 at 2:04 AM, Sean Owen so...@cloudera.com wrote:

 When you post a question anywhere, and say it's not working, you
 *really* need to say what that means.


 On Fri, Jan 30, 2015 at 8:20 PM, Amit Behera amit.bd...@gmail.com
 wrote:
  hi all,
 
  my sbt file is like this:
 
  name := Spark
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core % 1.1.0
 
  libraryDependencies += net.sf.opencsv % opencsv % 2.3
 
 
  code:
 
  object SparkJob
  {
 
def pLines(lines:Iterator[String])={
  val parser=new CSVParser()
  lines.map(l={val vs=parser.parseLine(l)
(vs(0),vs(1).toInt)})
}
 
def main(args: Array[String]) {
  val conf = new SparkConf().setAppName(Spark
 Job).setMaster(local)
  val sc = new SparkContext(conf)
  val data = sc.textFile(/home/amit/testData.csv).cache()
  val result = data.mapPartitions(pLines).groupByKey
  //val list = result.filter(x= {(x._1).contains(24050881)})
 
}
 
  }
 
 
  Here groupByKey is not working . But same thing is working from
 spark-shell.
 
  Please help me
 
 
  Thanks
 
  Amit





Re: Serialized task result size exceeded

2015-01-30 Thread Charles Feduke
Are you using the default Java object serialization, or have you tried Kryo
yet? If you haven't tried Kryo please do and let me know how much it
impacts the serialization size. (I know its more efficient, I'm curious to
know how much more efficient, and I'm being lazy - I don't have ~6K 500MB
files on hand.)

You can saveAsObjectFile on maybe a take(1) from an RDD and examine the
serialized output to see if maybe a much larger graph than you expect is
being output.

On Fri Jan 30 2015 at 3:47:31 PM ankits ankitso...@gmail.com wrote:

 This is on spark 1.2

 I am loading ~6k parquet files, roughly 500 MB each into a schemaRDD, and
 calling count() on it.

 After loading about 2705 tasks (there is one per file), the job crashes
 with
 this error:
 Total size of serialized results of 2705 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)

 This indicates that the results of each task are about 2705/1024 = 2.6MB
 each. Is that normal? I don't know exactly what the result of each task
 would be, but 2.6 MB for each seems too high. Can anyone offer an
 explanation as to what the normal size should be if this is too high, or
 ways to reduce this?

 Thanks.



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Serialized-task-result-size-exceeded-tp21449.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: groupByKey is not working

2015-01-30 Thread Charles Feduke
Define not working. Not compiling? If so you need:

import org.apache.spark.SparkContext._


On Fri Jan 30 2015 at 3:21:45 PM Amit Behera amit.bd...@gmail.com wrote:

 hi all,

 my sbt file is like this:

 name := Spark

 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core % 1.1.0

 libraryDependencies += net.sf.opencsv % opencsv % 2.3


 *code:*

 object SparkJob
 {

   def pLines(lines:Iterator[String])={
 val parser=new CSVParser()
 lines.map(l={val vs=parser.parseLine(l)
   (vs(0),vs(1).toInt)})
   }

   def main(args: Array[String]) {
 val conf = new SparkConf().setAppName(Spark Job).setMaster(local)
 val sc = new SparkContext(conf)
 val data = sc.textFile(/home/amit/testData.csv).cache()
 val result = data.mapPartitions(pLines).groupByKey
 //val list = result.filter(x= {(x._1).contains(24050881)})

   }

 }


 Here groupByKey is not working . But same thing is working from *spark-shell.*

 Please help me


 Thanks

 Amit




Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con

2015-01-28 Thread Charles Feduke
I deal with problems like this so often across Java applications with large
dependency trees. Add the shell function at the following link to your
shell on the machine where your Spark Streaming is installed:

https://gist.github.com/cfeduke/fe63b12ab07f87e76b38

Then run in the directory where your JAR files are:

find-java-class SchemeRegistryFactory

(I know you said HttpClient but the error seems to be an overload or method
of SchemeRegistryFactory is missing from the class that is loaded by the
class loader. The class loader loads the first class it finds that match
the package/class name coordinates.)

You'll then be able to zero in on the JAR that is bringing in an older
version of that class. Once you've done that you can exclude that JAR's
older dependency from in in your pom.

If you find out that the newer version is incompatible you'll have to
perform some magic with the Maven shade plugin.


On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when
 using *HttpSolrServer* from within Spark Streaming:

 15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.NoSuchMethodError: 
 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
   at 
 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
   at 
 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
   at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
   at 
 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
   at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
   at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
   at 
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
   at 
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)

 Normally, when I use my utility class that uses SolrJ to connect to a Solr
 server and run it by itself (running it stand-alone without Spark),
 everything works as expected. But when I invoke that utility class inside a
 Spark Streaming application, I get the exception above as soon as it is
 trying to establish a connection to the Solr server. My preliminary
 Internet search led me to believe that some Spark or Hadoop components
 bring an older version of *httpclient*, so I've tried to exclude them in
 my pom.xml.

 But I still get the same exception.

 Any ideas why? Or how can I fix it?

 When I analyze my pom.xml dependencies, I get:

 $ mvn dependency:tree -Ddetail=true | grep http
 [INFO] |  |  \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided
 [INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile
 [INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.3:compile
 [INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile

 The whole dependency tree is:

 $ mvn dependency:tree -Ddetail=true
 [INFO] Scanning for projects...
 [INFO]
 [INFO] 
 
 [INFO] Building bigcontent 1.0-SNAPSHOT
 [INFO] 
 
 [INFO]
 [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent ---
 [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT
 [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided
 [INFO] |  +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided
 [INFO] |  |  +- org.apache.curator:curator-recipes:jar:2.4.0:provided
 [INFO] |  |  |  \- org.apache.curator:curator-framework:jar:2.4.0:provided
 [INFO] |  |  | \- org.apache.curator:curator-client:jar:2.4.0:provided
 [INFO] |  |  +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  +- 
 org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:provided
 [INFO] |  |  |  +- 
 org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  |  +- 
 org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  |  \- 
 org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:provided
 [INFO] |  |  | \- 
 org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:provided
 [INFO] |  |  |\- 
 org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:provided
 [INFO] |  |  +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:provided
 [INFO] |  |  +- org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:provided
 [INFO] |  |  +- org.apache.commons:commons-lang3:jar:3.3.2:provided
 [INFO] |  |  +- org.slf4j:jul-to-slf4j:jar:1.7.5:provided
 [INFO] |  |  +- 

Re: spark 1.2 ec2 launch script hang

2015-01-28 Thread Charles Feduke
Yeah, I agree ~ should work. And it could have been [read: probably was]
the fact that one of the EC2 hosts was in my known_hosts (don't know, never
saw an error message, but the behavior is no error message for that state),
which I had fixed later with Pete's patch. But the second execution when
things worked with an absolute path could have worked because the random
hosts that came up on EC2 were never in my known_hosts.

On Wed Jan 28 2015 at 3:45:36 PM Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Hmm, I can’t see why using ~ would be problematic, especially if you
 confirm that echo ~/path/to/pem expands to the correct path to your
 identity file.

 If you have a simple reproduction of the problem, please send it over. I’d
 love to look into this. When I pass paths with ~ to spark-ec2 on my system,
 it works fine. I’m using bash, but zsh handles tilde expansion the same as
 bash.

 Nick
 ​

 On Wed Jan 28 2015 at 3:30:08 PM Charles Feduke charles.fed...@gmail.com
 wrote:

 It was only hanging when I specified the path with ~ I never tried
 relative.

 Hanging on the waiting for ssh to be ready on all hosts. I let it sit for
 about 10 minutes then I found the StackOverflow answer that suggested
 specifying an absolute path, cancelled, and re-run with --resume and the
 absolute path and all slaves were up in a couple minutes.

 (I've stood up 4 integration clusters and 2 production clusters on EC2
 since with no problems.)

 On Wed Jan 28 2015 at 12:05:43 PM Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 Ey-chih,

 That makes more sense. This is a known issue that will be fixed as part
 of SPARK-5242 https://issues.apache.org/jira/browse/SPARK-5242.

 Charles,

 Thanks for the info. In your case, when does spark-ec2 hang? Only when
 the specified path to the identity file doesn't exist? Or also when you
 specify the path as a relative path or with ~?

 Nick


 On Wed Jan 28 2015 at 9:29:34 AM ey-chih chow eyc...@hotmail.com
 wrote:

 We found the problem and already fixed it.  Basically, spark-ec2
 requires ec2 instances to have external ip addresses. You need to specify
 this in the ASW console.
 --
 From: nicholas.cham...@gmail.com
 Date: Tue, 27 Jan 2015 17:19:21 +
 Subject: Re: spark 1.2 ec2 launch script hang
 To: charles.fed...@gmail.com; pzybr...@gmail.com; eyc...@hotmail.com
 CC: user@spark.apache.org


 For those who found that absolute vs. relative path for the pem file
 mattered, what OS and shell are you using? What version of Spark are you
 using?

 ~/ vs. absolute path shouldn’t matter. Your shell will expand the ~/
 to the absolute path before sending it to spark-ec2. (i.e. tilde
 expansion.)

 Absolute vs. relative path (e.g. ../../path/to/pem) also shouldn’t
 matter, since we fixed that for Spark 1.2.0
 https://issues.apache.org/jira/browse/SPARK-4137. Maybe there’s some
 case that we missed?

 Nick

 On Tue Jan 27 2015 at 10:10:29 AM Charles Feduke 
 charles.fed...@gmail.com wrote:


 Absolute path means no ~ and also verify that you have the path to the
 file correct. For some reason the Python code does not validate that the
 file exists and will hang (this is the same reason why ~ hangs).
 On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com
 wrote:

 Try using an absolute path to the pem file



  On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote:
 
  Hi,
 
  I used the spark-ec2 script of spark 1.2 to launch a cluster.  I have
  modified the script according to
 
  https://github.com/grzegorz-dubicki/spark/commit/5dd8458d2ab
 9753aae939b3bb33be953e2c13a70
 
  But the script was still hung at the following message:
 
  Waiting for cluster to enter 'ssh-ready'
  state.
 
  Any additional thing I should do to make it succeed?  Thanks.
 
 
  Ey-Chih Chow
 
 
 
  --
  View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 ​




Re: spark 1.2 ec2 launch script hang

2015-01-28 Thread Charles Feduke
It was only hanging when I specified the path with ~ I never tried relative.

Hanging on the waiting for ssh to be ready on all hosts. I let it sit for
about 10 minutes then I found the StackOverflow answer that suggested
specifying an absolute path, cancelled, and re-run with --resume and the
absolute path and all slaves were up in a couple minutes.

(I've stood up 4 integration clusters and 2 production clusters on EC2
since with no problems.)

On Wed Jan 28 2015 at 12:05:43 PM Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Ey-chih,

 That makes more sense. This is a known issue that will be fixed as part of
 SPARK-5242 https://issues.apache.org/jira/browse/SPARK-5242.

 Charles,

 Thanks for the info. In your case, when does spark-ec2 hang? Only when the
 specified path to the identity file doesn't exist? Or also when you specify
 the path as a relative path or with ~?

 Nick


 On Wed Jan 28 2015 at 9:29:34 AM ey-chih chow eyc...@hotmail.com wrote:

 We found the problem and already fixed it.  Basically, spark-ec2 requires
 ec2 instances to have external ip addresses. You need to specify this in
 the ASW console.
 --
 From: nicholas.cham...@gmail.com
 Date: Tue, 27 Jan 2015 17:19:21 +
 Subject: Re: spark 1.2 ec2 launch script hang
 To: charles.fed...@gmail.com; pzybr...@gmail.com; eyc...@hotmail.com
 CC: user@spark.apache.org


 For those who found that absolute vs. relative path for the pem file
 mattered, what OS and shell are you using? What version of Spark are you
 using?

 ~/ vs. absolute path shouldn’t matter. Your shell will expand the ~/ to
 the absolute path before sending it to spark-ec2. (i.e. tilde expansion.)

 Absolute vs. relative path (e.g. ../../path/to/pem) also shouldn’t
 matter, since we fixed that for Spark 1.2.0
 https://issues.apache.org/jira/browse/SPARK-4137. Maybe there’s some
 case that we missed?

 Nick

 On Tue Jan 27 2015 at 10:10:29 AM Charles Feduke 
 charles.fed...@gmail.com wrote:


 Absolute path means no ~ and also verify that you have the path to the
 file correct. For some reason the Python code does not validate that the
 file exists and will hang (this is the same reason why ~ hangs).
 On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com wrote:

 Try using an absolute path to the pem file



  On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote:
 
  Hi,
 
  I used the spark-ec2 script of spark 1.2 to launch a cluster.  I have
  modified the script according to
 
  https://github.com/grzegorz-dubicki/spark/commit/5dd8458d2ab
 9753aae939b3bb33be953e2c13a70
 
  But the script was still hung at the following message:
 
  Waiting for cluster to enter 'ssh-ready'
  state.
 
  Any additional thing I should do to make it succeed?  Thanks.
 
 
  Ey-Chih Chow
 
 
 
  --
  View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 ​




Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con

2015-01-28 Thread Charles Feduke
It looks like you're shading in the Apache HTTP commons library and its a
different version than what is expected. (Maybe 4.6.x based on the Javadoc.)

I see you are attempting to exclude commons-httpclient by using:

exclusion
  groupIdcommons-httpclient/groupId
  artifactId*/artifactId
/exclusion

in your pom. However, what I think you really want is:

exclusion
  groupIdorg.apache.httpcomponents/groupId
  artifactIdhttpclient/artifactId
/exclusion

The last time the groupId was commons-httpclient was Aug 2007 as version
3.1 (search.maven.com). I hope none of your dependencies rely on that
particular version. SchemeRegistryFactory was introduced in version 4.3.1
of httpcomponents so even if by chance one of them did rely on
commons-httpclient there wouldn't be a class conflict.


On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com wrote:

 This is what I get:

  ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch
 emeRegistryFactory.class

 (probably because I'm using a self-contained JAR).

 In other words, I'm still stuck.

 --
 Emre


 On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke charles.fed...@gmail.com
 wrote:

 I deal with problems like this so often across Java applications with
 large dependency trees. Add the shell function at the following link to
 your shell on the machine where your Spark Streaming is installed:

 https://gist.github.com/cfeduke/fe63b12ab07f87e76b38

 Then run in the directory where your JAR files are:

 find-java-class SchemeRegistryFactory

 (I know you said HttpClient but the error seems to be an overload or
 method of SchemeRegistryFactory is missing from the class that is loaded by
 the class loader. The class loader loads the first class it finds that
 match the package/class name coordinates.)

 You'll then be able to zero in on the JAR that is bringing in an older
 version of that class. Once you've done that you can exclude that JAR's
 older dependency from in in your pom.

 If you find out that the newer version is incompatible you'll have to
 perform some magic with the Maven shade plugin.


 On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception
 when using *HttpSolrServer* from within Spark Streaming:

 15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.NoSuchMethodError: 
 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at 
 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at 
 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at 
 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at 
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at 
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)

 Normally, when I use my utility class that uses SolrJ to connect to a
 Solr server and run it by itself (running it stand-alone without Spark),
 everything works as expected. But when I invoke that utility class inside a
 Spark Streaming application, I get the exception above as soon as it is
 trying to establish a connection to the Solr server. My preliminary
 Internet search led me to believe that some Spark or Hadoop components
 bring an older version of *httpclient*, so I've tried to exclude them
 in my pom.xml.

 But I still get the same exception.

 Any ideas why? Or how can I fix it?

 When I analyze my pom.xml dependencies, I get:

 $ mvn dependency:tree -Ddetail=true | grep http
 [INFO] |  |  \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided
 [INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile
 [INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.3:compile
 [INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile

 The whole dependency tree is:

 $ mvn dependency:tree -Ddetail=true
 [INFO] Scanning for projects...
 [INFO]
 [INFO] 
 
 [INFO] Building bigcontent 1.0-SNAPSHOT
 [INFO] 
 
 [INFO]
 [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent ---
 [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT
 [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided
 [INFO] |  +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided
 [INFO] |  |  +- org.apache.curator:curator-recipes:jar

Re: Spark and S3 server side encryption

2015-01-28 Thread Charles Feduke
I have been trying to work around a similar problem with my Typesafe config
*.conf files seemingly not appearing on the executors. (Though now that I
think about it its not because the files are absent in the JAR, but because
the -Dconf.resource environment variable I pass to the master obviously
doesn't get relayed to the workers.)

What happens if you do something like this:

nohup ./bin/spark-submit --verbose —jars lib/app.jar \
--master spark://master-amazonaws.com:7077  \
--class com.elsevier.spark.SparkSync \
--conf
spark.executor.extraJavaOptions=-Ds3service.server-side-encryption=AES256
lib/app.jar  out.log 

(I bet this will fix my problem too.)


On Wed Jan 28 2015 at 10:17:09 AM Kohler, Curt E (ELS-STL) 
c.koh...@elsevier.com wrote:

  So, following up on your suggestion, I'm still having some problems
 getting the configuration changes recognized when my job run.


  I’ve added jets3t.properties to the root of my application jar file that
 I
 submit to Spark (via spark-submit).

 I’ve verified that my jets3t.properties is at the root of my application
 jar by executing jar tf app.jar.

 I submit my job to the cluster with the following command.

 nohup ./bin/spark-submit --verbose —jars lib/app.jar --master
 spark://master-amazonaws.com:7077  --class com.elsevier.spark.SparkSync
 lib/app.jar  out.log 



 In my mainline of app.jar, I also added the following code:


 log.info(System.getProperty(java.class.path));
 InputStream in =
 SparkSync.class.getClassLoader().getResourceAsStream(jets3t.properties);
 log.info(getStringFromInputStream(in));

 And I can see that the jets3t.properties I provided is found because it
 outputs:

 s3service.server-side-encryption=AES256

 It’s almost as if the hadoop/jets3t piece has already been initialized and
 is ignoring my jets3t.properties.

 I can get this all working inside of Eclipse by including the folder
 containing my jets3t.properties.  But, I can’t get things working when
 trying to submit this to a spark stand-alone cluster.

 Any insights would be appreciated.​
  --
 *From:* Thomas Demoor thomas.dem...@amplidata.com
 *Sent:* Tuesday, January 27, 2015 4:41 AM
 *To:* Kohler, Curt E (ELS-STL)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark and S3 server side encryption

  Spark uses the Hadoop filesystems.

  I assume you are trying to use s3n:// which, under the hood, uses the
 3rd party jets3t library. It is configured through the jets3t.properties
 file (google hadoop s3n jets3t) which you should put on Spark's
 classpath. The setting you are looking for
 is s3service.server-side-encryption

  The last version of hadoop (2.6) introduces a new and improved s3a://
 filesystem which has the official sdk from Amazon under the hood.


 On Mon, Jan 26, 2015 at 10:01 PM, curtkohler c.koh...@elsevier.com
 wrote:

 We are trying to create a Spark job that writes out a file to S3 that
 leverage S3's server side encryption for sensitive data. Typically this is
 accomplished by setting the appropriate header on the put request, but it
 isn't clear whether this capability is exposed in the Spark/Hadoop APIs.
 Does anyone have any suggestions?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-S3-server-side-encryption-tp21377.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con

2015-01-28 Thread Charles Feduke
Yeah it sounds like your original exclusion of commons-httpclient from
hadoop-* was correct, but its still coming in from somewhere.

Can you try something like this?:

dependency
artifactIdcommons-http/artifactId
groupIdhttpclient/groupId
scopeprovided/scope
/dependency

ref:
http://stackoverflow.com/questions/4716310/is-there-a-way-to-exclude-a-maven-dependency-globally

(I don't know if a provided dependency will work without a specific version
number so I'm just making a guess here.)


On Wed Jan 28 2015 at 11:24:02 AM Emre Sevinc emre.sev...@gmail.com wrote:

 When I examine the dependencies again, I see that SolrJ library is using
 v. 4.3.1 of org.apache.httpcomponents:httpclient

 [INFO] +- org.apache.solr:solr-solrj:jar:4.10.3:compile
 [INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile
 ==

 [INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.3:compile
 [INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile
 [INFO] |  +- org.codehaus.woodstox:wstx-asl:jar:3.2.7:compile
 [INFO] |  \- org.noggit:noggit:jar:0.5:compile

 But hadoop-common 2.4.0 is using v. 3.1.1 of
 commons-httpclient:commons-httpclient  :


 +- org.apache.hadoop:hadoop-common:jar:2.4.0:provided
 [INFO] |  +- commons-cli:commons-cli:jar:1.2:compile
 [INFO] |  +- org.apache.commons:commons-math3:jar:3.1.1:provided
 [INFO] |  +- xmlenc:xmlenc:jar:0.52:compile
 [INFO] |  +- commons-httpclient:commons-httpclient:jar:3.1:provided
 ===

 [INFO] |  +- commons-codec:commons-codec:jar:1.4:compile

 So my reasoning was: I have to exclude v. 3.1.1 of
 commons-httpclient:commons-httpclient and force it to use httpclient v.
 4.3.1 that SolrJ declares as a dependency.

 But apparently somehow it does not work, I mean I have also tried your
 latest suggestion (changed the 'exclusion' to org.apache.httpcomponents and
 httpclient), still getting the same exception:


 java.lang.NoSuchMethodError:
 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at
 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at
 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at
 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)


 Maybe it is about Hadoop 2.4.0, but I think this is what is included in
 the binary download of Spark. I've also tried it with Spark 1.2.0 binary
 (pre-built for Hadoop 2.4 and later).

 Or maybe I'm totally wrong, and the problem / fix is something completely
 different?

 --
 Emre




 On Wed, Jan 28, 2015 at 4:58 PM, Charles Feduke charles.fed...@gmail.com
 wrote:

 It looks like you're shading in the Apache HTTP commons library and its a
 different version than what is expected. (Maybe 4.6.x based on the Javadoc.)

 I see you are attempting to exclude commons-httpclient by using:

 exclusion
   groupIdcommons-httpclient/groupId
   artifactId*/artifactId
 /exclusion

 in your pom. However, what I think you really want is:

 exclusion
   groupIdorg.apache.httpcomponents/groupId
   artifactIdhttpclient/artifactId
 /exclusion

 The last time the groupId was commons-httpclient was Aug 2007 as
 version 3.1 (search.maven.com). I hope none of your dependencies rely on
 that particular version. SchemeRegistryFactory was introduced in version
 4.3.1 of httpcomponents so even if by chance one of them did rely on
 commons-httpclient there wouldn't be a class conflict.



 On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 This is what I get:

  ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch
 emeRegistryFactory.class

 (probably because I'm using a self-contained JAR).

 In other words, I'm still stuck.

 --
 Emre


 On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke 
 charles.fed...@gmail.com wrote:

 I deal with problems like this so often across Java applications with
 large dependency trees. Add the shell function at the following link to
 your shell on the machine where your Spark Streaming is installed:

 https://gist.github.com/cfeduke/fe63b12ab07f87e76b38

 Then run in the directory where your JAR files are:

 find-java-class SchemeRegistryFactory

 (I know you said HttpClient but the error seems to be an overload or
 method of SchemeRegistryFactory is missing from the class that is loaded by
 the class loader. The class loader

Re: spark 1.2 ec2 launch script hang

2015-01-27 Thread Charles Feduke
Absolute path means no ~ and also verify that you have the path to the file
correct. For some reason the Python code does not validate that the file
exists and will hang (this is the same reason why ~ hangs).
On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com wrote:

 Try using an absolute path to the pem file



  On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote:
 
  Hi,
 
  I used the spark-ec2 script of spark 1.2 to launch a cluster.  I have
  modified the script according to
 
  https://github.com/grzegorz-dubicki/spark/commit/
 5dd8458d2ab9753aae939b3bb33be953e2c13a70
 
  But the script was still hung at the following message:
 
  Waiting for cluster to enter 'ssh-ready'
  state.
 
  Any additional thing I should do to make it succeed?  Thanks.
 
 
  Ey-Chih Chow
 
 
 
  --
  View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: HW imbalance

2015-01-26 Thread Charles Feduke
You should look at using Mesos. This should abstract away the individual
hosts into a pool of resources and make the different physical
specifications manageable.

I haven't tried configuring Spark Standalone mode to have different specs
on different machines but based on spark-env.sh.template:

# - SPARK_WORKER_CORES, to set the number of cores to use on this machine

# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give
executors (e.g. 1000m, 2g)

# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g.
-Dx=y)

it looks like you should be able to mix. (Its not clear to me whether
SPARK_WORKER_MEMORY is uniform across the cluster or for the machine where
the config file resides.)

On Mon Jan 26 2015 at 8:07:51 AM Antony Mayi antonym...@yahoo.com.invalid
wrote:

 Hi,

 is it possible to mix hosts with (significantly) different specs within a
 cluster (without wasting the extra resources)? for example having 10 nodes
 with 36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there
 a way to utilize the extra memory by spark executors (as my understanding
 is all spark executors must have same memory).

 thanks,
 Antony.



Re: No AMI for Spark 1.2 using ec2 scripts

2015-01-26 Thread Charles Feduke
I definitely have Spark 1.2 running within EC2 using the spark-ec2 scripts.
I downloaded Spark 1.2 with prebuilt for Hadoop 2.4 and later.

What parameters are you using when you execute spark-ec2?

I am launching in the us-west-1 region (ami-7a320f3f) which may explain
things.

On Mon Jan 26 2015 at 2:40:01 AM hajons haj...@gmail.com wrote:

 Hi,

 When I try to launch a standalone cluster on EC2 using the scripts in the
 ec2 directory for Spark 1.2, I get the following error:

 Could not resolve AMI at:
 https://raw.github.com/mesos/spark-ec2/v4/ami-list/us-east-1/pvm

 It seems there is not yet any AMI available on EC2. Any ideas when there
 will be one?

 This works without problems for version 1.1. Starting up a cluster using
 these scripts is so simple and straightforward, so I am really missing it
 on
 1.2.

 /Håkan



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/No-AMI-for-Spark-1-2-using-ec2-scripts-tp21362.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)

2015-01-25 Thread Charles Feduke
I've got my solution working:

https://gist.github.com/cfeduke/3bca88ed793ddf20ea6d

I couldn't actually perform the steps I outlined in the previous message in
this thread because I would ultimately be trying to serialize a
SparkContext to the workers to use during the generation of 1..*n* JdbcRDDs.
So I took a look at the source for JdbcRDD and it was trivial to adjust to
my needs.

This got me thinking about your problem; the JdbcRDD that ships with Spark
will shard the query across the cluster by a Long ID value (requiring you
to put ? placeholders in your query for use as part of a range boundary) so
if you've got such a key - or any series field that happens to be a Long -
then you'd just need to use the PostgreSQL JDBC driver and get your JDBC
URL:
http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html

If you have something other than Long for your primary key/series data type
then you can do the same thing I did and modify a copy of JdbcRDD, though
your changes would be even fewer than my own. (Though I can't see anything
much different than a Long or date/time working for this since it has to
partition the full range into appropriate sub-ranges.)

Because of the sub-range bucketing and cluster distribution you shouldn't
run into OOM errors, assuming you provision sufficient worker nodes in the
cluster.

On Sun Jan 25 2015 at 9:39:56 AM Charles Feduke charles.fed...@gmail.com
wrote:

 I'm facing a similar problem except my data is already pre-sharded in
 PostgreSQL.

 I'm going to attempt to solve it like this:

 - Submit the shard names (database names) across the Spark cluster as a
 text file and partition it so workers get 0 or more - hopefully 1 - shard
 name. In this case you could partition ranges - if your primary key is a
 datetime, then a start/end datetime pair; or if its a long then a start/end
 long pair. (You may need to run a separate job to get your overall
 start/end pair and then calculate how many partitions you need from there.)

 - Write the job so that the worker loads data from its shard(s) and unions
 the RDDs together. In the case of pairs the concept is the same. Basically
 look at how the JdbcRDD constructor requires a start, end, and query
 (disregard numPartitions in this case since we're manually partitioning in
 the step above). Your query will be its initial filter conditions plus a
 between condition for the primary key and its pair.

 - Operate on the union RDDs with other transformations or filters.

 If everything works as planned then the data should be spread out across
 the cluster and no one node will be responsible for loading TiBs of data
 and then distributing it to its peers. That should help with your OOM
 problem.

 Of course this does not guarantee that the data is balanced across nodes.
 With a large amount of data it should balance well enough to get the job
 done though.

 (You may need to run several refinements against the complete dataset to
 figure out the appropriate start/end pair values to get an RDD that is
 partitioned and balanced across the workers. This is a task best performed
 using aggregate query logic or stored procedures. With my shard problem I
 don't have this option available.)

 Unless someone has a better idea, in which case I'd love to hear it.


 On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid
 wrote:

 Hi Nicholas,

 thanks for your reply. I checked spark-redshift - it's just for the
 unload data files stored on hadoop, not for online result sets from DB.

 Do you know of any example of a custom RDD which fetches the data on the
 fly (not reading from HDFS)?

 Thanks.

 Denis

   --
  *From:* Nicholas Chammas nicholas.cham...@gmail.com
 *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org 
 user@spark.apache.org
 *Sent:* Sunday, 25 January 2015, 3:06
 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS
 Redshift)

 I believe databricks provides an rdd interface to redshift. Did you check
 spark-packages.org?
 On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid
 wrote:

 Hello,

 we've got some analytics data in AWS Redshift. The data is being
 constantly updated.

 I'd like to be able to write a query against Redshift which would return
 a subset of data, and then run a Spark job (Pyspark) to do some analysis.

 I could not find an RDD which would let me do it OOB (Python), so I tried
 writing my own. For example, tried combination of a generator (via yield)
 with parallelize. It appears though that parallelize reads all the data
 first into memory as I get either OOM or Python swaps as soon as I increase
 the number of rows beyond trivial limits.

 I've also looked at Java RDDs (there is an example of MySQL RDD) but it
 seems that it also reads all the data into memory.

 So my question is - how to correctly feed Spark with huge datasets which
 don't initially reside in HDFS/S3 (ideally

Re: where storagelevel DISK_ONLY persists RDD to

2015-01-25 Thread Charles Feduke
I think you want to instead use `.saveAsSequenceFile` to save an RDD to
someplace like HDFS or NFS it you are attempting to interoperate with
another system, such as Hadoop. `.persist` is for keeping the contents of
an RDD around so future uses of that particular RDD don't need to
recalculate its composite parts.

On Sun Jan 25 2015 at 3:36:31 AM Larry Liu larryli...@gmail.com wrote:

 I would like to persist RDD TO HDFS or NFS mount. How to change the
 location?



Re: Analyzing data from non-standard data sources (e.g. AWS Redshift)

2015-01-25 Thread Charles Feduke
I'm facing a similar problem except my data is already pre-sharded in
PostgreSQL.

I'm going to attempt to solve it like this:

- Submit the shard names (database names) across the Spark cluster as a
text file and partition it so workers get 0 or more - hopefully 1 - shard
name. In this case you could partition ranges - if your primary key is a
datetime, then a start/end datetime pair; or if its a long then a start/end
long pair. (You may need to run a separate job to get your overall
start/end pair and then calculate how many partitions you need from there.)

- Write the job so that the worker loads data from its shard(s) and unions
the RDDs together. In the case of pairs the concept is the same. Basically
look at how the JdbcRDD constructor requires a start, end, and query
(disregard numPartitions in this case since we're manually partitioning in
the step above). Your query will be its initial filter conditions plus a
between condition for the primary key and its pair.

- Operate on the union RDDs with other transformations or filters.

If everything works as planned then the data should be spread out across
the cluster and no one node will be responsible for loading TiBs of data
and then distributing it to its peers. That should help with your OOM
problem.

Of course this does not guarantee that the data is balanced across nodes.
With a large amount of data it should balance well enough to get the job
done though.

(You may need to run several refinements against the complete dataset to
figure out the appropriate start/end pair values to get an RDD that is
partitioned and balanced across the workers. This is a task best performed
using aggregate query logic or stored procedures. With my shard problem I
don't have this option available.)

Unless someone has a better idea, in which case I'd love to hear it.


On Sun Jan 25 2015 at 4:19:38 AM Denis Mikhalkin deni...@yahoo.com.invalid
wrote:

 Hi Nicholas,

 thanks for your reply. I checked spark-redshift - it's just for the unload
 data files stored on hadoop, not for online result sets from DB.

 Do you know of any example of a custom RDD which fetches the data on the
 fly (not reading from HDFS)?

 Thanks.

 Denis

   --
  *From:* Nicholas Chammas nicholas.cham...@gmail.com
 *To:* Denis Mikhalkin deni...@yahoo.com; user@spark.apache.org 
 user@spark.apache.org
 *Sent:* Sunday, 25 January 2015, 3:06
 *Subject:* Re: Analyzing data from non-standard data sources (e.g. AWS
 Redshift)

 I believe databricks provides an rdd interface to redshift. Did you check
 spark-packages.org?
 On 2015년 1월 24일 (토) at 오전 6:45 Denis Mikhalkin deni...@yahoo.com.invalid
 wrote:

 Hello,

 we've got some analytics data in AWS Redshift. The data is being
 constantly updated.

 I'd like to be able to write a query against Redshift which would return a
 subset of data, and then run a Spark job (Pyspark) to do some analysis.

 I could not find an RDD which would let me do it OOB (Python), so I tried
 writing my own. For example, tried combination of a generator (via yield)
 with parallelize. It appears though that parallelize reads all the data
 first into memory as I get either OOM or Python swaps as soon as I increase
 the number of rows beyond trivial limits.

 I've also looked at Java RDDs (there is an example of MySQL RDD) but it
 seems that it also reads all the data into memory.

 So my question is - how to correctly feed Spark with huge datasets which
 don't initially reside in HDFS/S3 (ideally for Pyspark, but would
 appreciate any tips)?

 Thanks.

 Denis







JDBC sharded solution

2015-01-24 Thread Charles Feduke
I'm trying to figure out the best approach to getting sharded data from
PostgreSQL into Spark.

Our production PGSQL cluster has 12 shards with TiB of data on each shard.
(I won't be accessing all of the data on a shard at once, but I don't think
its feasible to use Sqoop to copy tables who's data will be out of date
rather quickly.) We are using RDS replication in AWS so read-heavy queries
against the sharded data is okay.

I see that the JdbcRDD is really designed to take data in chunks from a
single datasource, where partitioning will spread the chunks across the
cluster. This is neat for a single JDBC datasource but inconvenient when
the data is already sharded.

My current plan is to create a small text file with the shard names of our
cluster and partition it across the Spark cluster. From there I will use
custom code to process a SQL statement in the context of a JdbcRDD,
generating 1..*n* Connections (and 1..*n *JdbcRDDs) with a partition size
of 1 so each worker will handle 1..*n* shards [ideally 1]; those RDDs will
then be unioned together [when more than 1 RDD] to get a shard's worth of
data satisfying the SQL query into the worker for further processing.

It seems like there should already be an established solution for this
pattern though so I want to see if I am going about this entirely the wrong
way and should instead be using something else.

(If it matters we also have a second datacenter Cassandra cluster for
executing analytic queries against that I could use if necessary.
Originally I was going to ETL the PGSQL data into this cluster but that
poses its own set of challenges.)