Does spark *always* fork its workers?

2015-02-18 Thread Kevin Burton
I want to map over a Cassandra table in Spark but my code that executes
needs a shutdown() call to return any threads, release file handles, etc.

Will spark always execute my mappers as a forked process? And if so how do
I handle threads preventing the JVM from terminating.

It would be nice if there was a way to clean up after yourself gracefully
in map jobs but I don’t think that exists right now.

-- 

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile
https://plus.google.com/102718274791889610666/posts
http://spinn3r.com


Re: Spark Streaming output cannot be used as input?

2015-02-18 Thread Emre Sevinc
Hello Jose,

We've hit the same issue a couple of months ago. It is possible to write
directly to files instead of creating directories, but it is not
straightforward, and I haven't seen any clear demonstration in books,
tutorials, etc.

We do something like:

SparkConf sparkConf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(batchInterval));
JavaDStreamString stream = MyModuleApp.initializeJob(ssc);
MyModuleApp.process(stream);

And then in the process method:

@Override public void process(JavaDStreamString inStream) {
JavaDStreamString json = inStream.map(new
MyModuleWorker(jsonSchemaName, validatedJSONoutputDir,
rejectedJSONoutputDir));
forceOutput(json);  }


This, in turn, calls the following (I've removed the irrelevant lines
to focus on writing):


public class MyModuleWorker implements FunctionString,String {

  public String call(String json) {
// process the data and then write it

writeJSON(json, validatedJSONoutputDir_);  }}

And the writeJSON method is:

public static final void writeJSON(String json, String jsonDirPath)
throws IOException {String jsonFileName = jsonDirPath + / +
UUID.randomUUID().toString() + .json.tmp;URI uri =
URI.create(jsonFileName);Configuration conf = new Configuration();
   FileSystem fileSystem = FileSystem.get(uri, conf);
FSDataOutputStream out = fileSystem.create(new Path(uri));
out.write(json.getBytes(StandardCharsets.UTF_8));out.close();
fileSystem.rename(new Path(uri),new
Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() +
.json)));  }


Using a similar technique you might be able to achieve your objective.

Kind regards,

Emre Sevinç
http://www.bigindustries.be/



On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez jfernan...@sdl.com wrote:

  Hello folks,



 Our intended use case is:

 -  Spark Streaming app #1 reads from RabbitMQ and output to HDFS

 -  Spark Streaming app #2 reads #1’s output and stores the data
 into Elasticsearch



 The idea behind this architecture is that if Elasticsearch is down due to
 an upgrade or system error we don’t have to stop reading messages from the
 queue. We could also scale each process separately as needed.



 After a few hours research my understanding is that Spark Streaming
 outputs files in a *directory* for which you provide the prefix and suffix.
 This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting
 otherwise:



   /**

* Save each RDD in this DStream as a Sequence file of serialized
 objects.

* The file name at each batch interval is generated based on `prefix`
 and

* `suffix`: prefix-TIME_IN_MS.suffix.

*/



 Spark Streaming can monitor an HDFS directory for files but subfolders are
 not supported. So as far as I can tell, it is not possible to use Spark
 Streaming output as input for a different Spark Streaming app without
 somehow performing a separate operation in the middle.



 Am I missing something obvious? I’ve read some suggestions like using
 Hadoop to merge the directories (whose names I don’t see how you would
 know) and to reduce the partitions to 1 (which wouldn’t help).



 Any other suggestions? What is the expected pattern a developer would
 follow that would make Spark Streaming’s output format usable?





 www.sdl.com
 http://www.sdl.com/?utm_source=Emailutm_medium=Email%2BSignatureutm_campaign=SDL%2BStandard%2BEmail%2BSignature

  *SDL PLC confidential, all rights reserved.* If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.

 This message has been scanned for malware by Websense. www.websense.com




-- 
Emre Sevinc


Cannot access Spark web UI

2015-02-18 Thread Mukesh Jha
Hello Experts,

I am running a spark-streaming app inside YARN. I have Spark History server
running as well (Do we need it running to access UI?).

The app is running fine as expected but the Spark's web UI is not
accessible.

When I try to access the ApplicationMaster of the Yarn application I get
the below error.

This looks very similar to https://issues.apache.org/jira/browse/SPARK-5837
but instead of java.net.ConnectException: Connection refused I am getting
java.net.BindException: Cannot assign requested address as shown below.

Please let me know if you have faced / fixed this issue, any help is
greatly appreciated.


*Exception*

HTTP ERROR 500

Problem accessing /proxy/application_1424161379156_0001/. Reason:

Cannot assign requested address

Caused by:

java.net.BindException: Cannot assign requested address
at java.net.PlainSocketImpl.socketBind(Native Method)
at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
at java.net.Socket.bind(Socket.java:631)
at java.net.Socket.init(Socket.java:423)
at java.net.Socket.init(Socket.java:280)
at
org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
at
org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
at
org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
at
org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
at
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346)
at
org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188)
at
org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
at
com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
at
org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
at
com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
at
com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
at
com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:592)
at
org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:555)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
at
org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
at org.mortbay.jetty.Server.handle(Server.java:326)
at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
at
org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
at 

Re: Cannot access Spark web UI

2015-02-18 Thread Akhil Das
The error says Cannot assign requested address. This means that you need to
use the correct address for one of your network interfaces or 0.0.0.0 to
accept connections from all interfaces. Can you paste your spark-env.sh
file and /etc/hosts file.

Thanks
Best Regards

On Wed, Feb 18, 2015 at 2:06 PM, Mukesh Jha me.mukesh@gmail.com wrote:

 Hello Experts,

 I am running a spark-streaming app inside YARN. I have Spark History
 server running as well (Do we need it running to access UI?).

 The app is running fine as expected but the Spark's web UI is not
 accessible.

 When I try to access the ApplicationMaster of the Yarn application I get
 the below error.

 This looks very similar to
 https://issues.apache.org/jira/browse/SPARK-5837 but instead of 
 java.net.ConnectException:
 Connection refused I am getting java.net.BindException: Cannot assign
 requested address as shown below.

 Please let me know if you have faced / fixed this issue, any help is
 greatly appreciated.


 *Exception*

 HTTP ERROR 500

 Problem accessing /proxy/application_1424161379156_0001/. Reason:

 Cannot assign requested address

 Caused by:

 java.net.BindException: Cannot assign requested address
 at java.net.PlainSocketImpl.socketBind(Native Method)
 at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
 at java.net.Socket.bind(Socket.java:631)
 at java.net.Socket.init(Socket.java:423)
 at java.net.Socket.init(Socket.java:280)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
 at
 org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
 at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
 at
 org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
 at
 com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
 at
 com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
 at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:592)
 at
 org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:555)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
 at
 org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
 at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
 at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767)
 at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
 at
 

RE: spark-core in a servlet

2015-02-18 Thread Anton Brazhnyk
Check for the dependencies. Looks like you have a conflict around servlet-api 
jars.
Maven's dependency-tree, some exclusions and some luck :) could help.


From: Ralph Bergmann | the4thFloor.eu [ra...@the4thfloor.eu]
Sent: Tuesday, February 17, 2015 4:14 PM
To: user@spark.apache.org
Subject: spark-core in a servlet

Hi,


I want to use spark-core inside of a HttpServlet. I use Maven for the
build task but I have a dependency problem :-(

I get this error message:

ClassCastException:
com.sun.jersey.server.impl.container.servlet.JerseyServletContainerInitializer
cannot be cast to javax.servlet.ServletContainerInitializer

When I add this exclusions it builds but than there are other classes
not found at runtime:

  dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.11/artifactId
 version1.2.1/version
 exclusions
exclusion
   groupIdorg.apache.hadoop/groupId
   artifactIdhadoop-client/artifactId
/exclusion
exclusion
   groupIdorg.eclipse.jetty/groupId
   artifactId*/artifactId
/exclusion
 /exclusions
  /dependency


What can I do?


Thanks a lot!,

Ralph

--

Ralph Bergmann

iOS and Android app developer


www  http://www.the4thFloor.eu

mail ra...@the4thfloor.eu
skypedasralph

google+  https://plus.google.com/+RalphBergmann
xing https://www.xing.com/profile/Ralph_Bergmann3
linkedin https://www.linkedin.com/in/ralphbergmann
gulp https://www.gulp.de/Profil/RalphBergmann.html
github   https://github.com/the4thfloor


pgp key id   0x421F9B78
pgp fingerprint  CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-18 Thread Emre Sevinc
Hello Imran,

(a) I know that all 20 files are processed when I use foreachRDD, because I
can see the processed files in the output directory. (My application logic
writes them to an output directory after they are processed, *but* that
writing operation does not happen in foreachRDD, below you can see the URL
that includes my code and clarifies this).

(b) I know only 16 files are processed because in the output directory I
see only 16 files processed. I wait for minutes and minutes and no more
files appear in the output directory. When I see only 16 files are
processed and Spark Streaming went to the mode of idly watching the input
directory, and then if I copy a few more files, they are also processed.

(c) Sure, you can see part of my code in the following gist:
https://gist.github.com/emres/0fb6de128baea099e741
 It might seem a little convoluted at first, because my application is
divided into two classes, a Driver class (setting up things and
initializing them), and a Worker class (that implements the core
functionality). I've also put the relevant methods from the my utility
classes for completeness.

I am as perplexed as you are as to why forcing the output via foreachRDD
ended up in different behaviour compared to simply using print() method.

Kind regards,
Emre



On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com wrote:

 Hi Emre,

 there shouldn't be any difference in which files get processed w/ print()
 vs. foreachRDD().  In fact, if you look at the definition of print(), it is
 just calling foreachRDD() underneath.  So there is something else going on
 here.

 We need a little more information to figure out exactly what is going on.
  (I think Sean was getting at the same thing ...)

 (a) how do you know that when you use foreachRDD, all 20 files get
 processed?

 (b) How do you know that only 16 files get processed when you print()? Do
 you know the other files are being skipped, or maybe they are just stuck
 somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
 processed ... what happens after you add a few more files to the
 directory?  Are they processed immediately, or are they never processed
 either?

 (c) Can you share any more code of what you are doing to the dstreams
 *before* the print() / foreachRDD()?  That might give us more details about
 what the difference is.

 I can't see how .count.println() would be different than just println(),
 but maybe I am missing something also.

 Imran

 On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files
 and does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also have
 fixed my problem, but I'm satisfied with foreachRDD approach for now.
 (Though it is still a mystery to me why using .print() had a difference,
 maybe my mental model of Spark is wrong, I thought no matter what output
 operation I used, the number of files processed by Spark would be
 independent of that because the processing is done in a different method,
 .print() is only used to force Spark execute that processing, am I wrong?).

 --
 Emre


 On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't let
 you detect whether it happened. Print should print some results to the
 console but on different machines, so may not be a reliable way to see what
 happened.

 Yes I understand your real process uses foreachRDD and that's what you
 should use. It sounds like that works. But you must always have been using
 that right? What do you mean that you changed to use it?

 Basically I'm not clear on what the real code does and what about the
 output of that code tells you only 16 files were processed.
 On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead
 of print (see my second message in this thread). Apparently forcing Spark
 to materialize DAG via print() is not the way to go. (My interpretation
 might be 

Re: Re: Problem with 1 master + 2 slaves cluster

2015-02-18 Thread Emre Sevinc
On Wed, Feb 18, 2015 at 10:23 AM, bit1...@163.com bit1...@163.com wrote:

 Sure, thanks Akhil.
 A further question : Is local file system(file:///) not supported in
 standalone cluster?



FYI: I'm able to write to local file system (via HDFS API and using
file:/// notation) when using Spark.


--
Emre Sevinç
http://www.bigindustries.be/


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-18 Thread Emre Sevinc
Thanks to everyone for suggestions and explanations.

Currently I've started to experiment with the following scenario, that
seems to work for me:

- Put the properties file on a web server so that it is centrally available
- Pass it to the Spark driver program via --conf 'propertiesFile=http:
//myWebServer.com/mymodule.properties'
- And then load the configuration using Apache Commons Configuration:

PropertiesConfiguration config = new PropertiesConfiguration();
config.load(System.getProperty(propertiesFile));

Using the method described above, I don't need to statically compile my
properties file into the über JAR anymore, I can modify the file on the web
server and when I submit my application via spark-submit, passing the URL
of the properties file, the driver program reads the contents of that file
for once, retrieves the values of the keys and continues.

PS: I've opted for Apache Commons Configuration because it is already part
of the many dependencies I have in my pom.xml, and I did not want to pull
another library, even though I Typesafe Config library seems to be a
powerful and flexible choice, too.

--
Emre



On Tue, Feb 17, 2015 at 6:12 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 Emre,

 As you are keeping the properties file external to the JAR you need to
 make sure to submit the properties file as an additional --files (or
 whatever the necessary CLI switch is) so all the executors get a copy of
 the file along with the JAR.

 If you know you are going to just put the properties file on HDFS then why
 don't you define a custom system setting like properties.url and pass it
 along:

 (this is for Spark shell, the only CLI string I have available at the
 moment:)

 spark-shell --jars $JAR_NAME \
 --conf 'properties.url=hdfs://config/stuff.properties' \
 --conf
 'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties'

 ... then load the properties file during initialization by examining the
 properties.url system setting.

 I'd still strongly recommend Typesafe Config as it makes this a lot less
 painful, and I know for certain you can place your *.conf at a URL (using
 the -Dconfig.url=) though it probably won't work with an HDFS URL.



 On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com
 wrote:

 +1 for TypeSafe config
 Our practice is to include all spark properties under a 'spark' entry in
 the config file alongside job-specific configuration:

 A config file would look like:
 spark {
  master = 
  cleaner.ttl = 123456
  ...
 }
 job {
 context {
 src = foo
 action = barAction
 }
 prop1 = val1
 }

 Then, to create our Spark context, we transparently pass the spark
 section to a SparkConf instance.
 This idiom will instantiate the context with the spark specific
 configuration:


 sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark)))

 And we can make use of the config object everywhere else.

 We use the override model of the typesafe config: reasonable defaults go
 in the reference.conf (within the jar). Environment-specific overrides go
 in the application.conf (alongside the job jar) and hacks are passed with
 -Dprop=value :-)


 -kr, Gerard.


 On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 I've decided to try

   spark-submit ... --conf
 spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

 But when I try to retrieve the value of propertiesFile via

System.err.println(propertiesFile :  +
 System.getProperty(propertiesFile));

 I get NULL:

propertiesFile : null

 Interestingly, when I run spark-submit with --verbose, I see that it
 prints:

   spark.driver.extraJavaOptions -
 -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

 I couldn't understand why I couldn't get to the value of
 propertiesFile by using standard System.getProperty method. (I can use
 new SparkConf().get(spark.driver.extraJavaOptions)  and manually parse
 it, and retrieve the value, but I'd like to know why I cannot retrieve that
 value using System.getProperty method).

 Any ideas?

 If I can achieve what I've described above properly, I plan to pass a
 properties file that resides on HDFS, so that it will be available to my
 driver program wherever that program runs.

 --
 Emre




 On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke 
 charles.fed...@gmail.com wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf
 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

cannot connect to Spark Application Master in YARN

2015-02-18 Thread rok
I'm trying to access the Spark UI for an application running through YARN.
Clicking on the Application Master under Tracking UI I get an HTTP ERROR
500: 

HTTP ERROR 500

Problem accessing /proxy/application_1423151769242_0088/. Reason:

Connection refused

Caused by:

java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.init(Socket.java:425)
at java.net.Socket.init(Socket.java:280)
at
org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
at
org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
at
org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
at
org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
at
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346)
at
org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:187)
at
org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:344)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at 
org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
at
com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
at
org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84)
at
com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
at
com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
at
com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
at
com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:572)
at
org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:269)
at
org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:542)
at
org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:84)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at
org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1224)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
at
org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
at 
org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
at
org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
at 
org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
at 
org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
at

Re: cannot connect to Spark Application Master in YARN

2015-02-18 Thread Sean Owen
Can you track your comments on the existing issue?

https://issues.apache.org/jira/browse/SPARK-5837

I personally can't reproduce this but more info would help narrow it down.

On Wed, Feb 18, 2015 at 10:58 AM, rok rokros...@gmail.com wrote:
 I'm trying to access the Spark UI for an application running through YARN.
 Clicking on the Application Master under Tracking UI I get an HTTP ERROR
 500:

 HTTP ERROR 500

 Problem accessing /proxy/application_1423151769242_0088/. Reason:

 Connection refused

 Caused by:

 java.net.ConnectException: Connection refused
 at java.net.PlainSocketImpl.socketConnect(Native Method)
 at
 java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
 at
 java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
 at
 java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
 at java.net.Socket.connect(Socket.java:579)
 at java.net.Socket.connect(Socket.java:528)
 at java.net.Socket.init(Socket.java:425)
 at java.net.Socket.init(Socket.java:280)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
 at
 org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:187)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:344)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
 at 
 org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
 at
 org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
 at
 com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
 at
 com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
 at 
 com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:572)
 at
 org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:269)
 at
 org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:542)
 at
 org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:84)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1224)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at 
 org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at 
 org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at 
 org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
 at
 

Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Juan Rodríguez Hortalá
Hi,

I'm writing a Spark program where I want to divide a RDD into different
groups, but the groups are too big to use groupByKey. To cope with that,
since I know in advance the list of keys for each group, I build a map from
the keys to the RDDs that result from filtering the input RDD to get the
records for the corresponding key. This works when I have a small number of
keys, but for big number of keys (tens of thousands) the execution gets
stuck, without issuing any new Spark stage. I suspect the reason is that
the Spark scheduler is not able to handle so many RDDs. Does it make sense?
I'm rewriting the program to use a single RDD of pairs, with cached
partions, but I wanted to be sure I understand the problem here.

Thanks a lot in advance,

Greetings,

Juan Rodriguez


spark 1.2 slower than 1.0 in unit tests

2015-02-18 Thread Marcin Cylke
Hi

We're using Spark in our app's unit tests. The tests start spark
context with local[*] and test time now is 178 seconds on spark 1.2
instead of 41 seconds on 1.0.

We are using spark version from cloudera CDH (1.2.0-cdh5.3.1).

Could you give some hints what could cause that? and where to search
for a solution for that?

Regards
Marcin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[POWERED BY] Can you add Big Industries to the Powered by Spark page?

2015-02-18 Thread Emre Sevinc
Hello,

Could you please add Big Industries to the Powered by Spark page at
https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ?


Company Name: Big Industries

URL:  http://http://www.bigindustries.be/

Spark Components: Spark Streaming

Use Case: Big Content Platform

Summary: The Big Content Platform is a business-to-business content asset
management service providing a searchable, aggregated source of live news
feeds, public domain media and archives of content.

The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache
Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the
platform leverages public datasets like Freebase, DBpedia, Wiktionary, and
Geonames to support semantic text enrichment.



Kind regards,

Emre Sevinç
http://http://www.bigindustries.be/


Re: Problem with 1 master + 2 slaves cluster

2015-02-18 Thread bit1...@163.com
But I am able to run the SparkPi example:
./run-example SparkPi 1000 --master spark://192.168.26.131:7077

Result:Pi is roughly 3.14173708



bit1...@163.com
 
From: bit1...@163.com
Date: 2015-02-18 16:29
To: user
Subject: Problem with 1 master + 2 slaves cluster
Hi sparkers,
I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then startup 
them, everything looks running normally.
In the master node, I run the spark-shell, with the following steps:

bin/spark-shell --master spark://192.168.26.131:7077
scala var rdd = sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7)
rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2, 
x._1)).sortByKey(false).map(x = (x._2, 
x._1)).saveAsTextFile(file:///home/hadoop/output)

After finishing running the application, there is no word count related output, 
there does exist an output directory appear on each slave node,  but there is 
only a _temporary subdirectory

Any ideas? Thanks!






Re: Re: Problem with 1 master + 2 slaves cluster

2015-02-18 Thread Akhil Das
when you give file:// , while reading, it requires that all slaves has that
path/file available locally in their system. It's ok to give file:// when
you run your application in local mode (like master=local[*])

Thanks
Best Regards

On Wed, Feb 18, 2015 at 2:58 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 On Wed, Feb 18, 2015 at 10:23 AM, bit1...@163.com bit1...@163.com wrote:

 Sure, thanks Akhil.
 A further question : Is local file system(file:///) not supported in
 standalone cluster?



 FYI: I'm able to write to local file system (via HDFS API and using
 file:/// notation) when using Spark.


 --
 Emre Sevinç
 http://www.bigindustries.be/




Re: Cannot access Spark web UI

2015-02-18 Thread Arush Kharbanda
It seems like that its not able to get a port it needs are you sure that
the required port is available. In what logs did you find this error?

On Wed, Feb 18, 2015 at 2:21 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 The error says Cannot assign requested address. This means that you need
 to use the correct address for one of your network interfaces or 0.0.0.0 to
 accept connections from all interfaces. Can you paste your spark-env.sh
 file and /etc/hosts file.

 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 2:06 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Experts,

 I am running a spark-streaming app inside YARN. I have Spark History
 server running as well (Do we need it running to access UI?).

 The app is running fine as expected but the Spark's web UI is not
 accessible.

 When I try to access the ApplicationMaster of the Yarn application I get
 the below error.

 This looks very similar to
 https://issues.apache.org/jira/browse/SPARK-5837 but instead of 
 java.net.ConnectException:
 Connection refused I am getting java.net.BindException: Cannot assign
 requested address as shown below.

 Please let me know if you have faced / fixed this issue, any help is
 greatly appreciated.


 *Exception*

 HTTP ERROR 500

 Problem accessing /proxy/application_1424161379156_0001/. Reason:

 Cannot assign requested address

 Caused by:

 java.net.BindException: Cannot assign requested address
 at java.net.PlainSocketImpl.socketBind(Native Method)
 at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
 at java.net.Socket.bind(Socket.java:631)
 at java.net.Socket.init(Socket.java:423)
 at java.net.Socket.init(Socket.java:280)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
 at
 org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
 at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
 at
 org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
 at
 com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
 at
 com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
 at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:592)
 at
 org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:555)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
 at
 org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
 at
 

Re: Spark Streaming output cannot be used as input?

2015-02-18 Thread Sean Owen
To clarify, sometimes in the world of Hadoop people freely refer to an
output 'file' when it's really a directory containing 'part-*' files which
are pieces of the file. It's imprecise but that's the meaning. I think the
scaladoc may be referring to 'the path to the file, which includes this
parent dir, is generated ...' In an inherently distributed system, you want
to distributed writes and reads, so big files are really made of logical
files within a directory.

There is a JIRA open to support nested dirs which has been languishing:
https://issues.apache.org/jira/browse/SPARK-3586
I'm hoping to pursue that again with help from tdas after 1.3.
That's probably the best solution.

An alternative is to not use the file system as a sort of message queue,
and instead use something like Kafka. It has a lot of other benefits but
maybe it's not feasible to add this to your architecture.

You can merge the files with HDFS APIs without much trouble. The dirs will
be named consistently according to time and are something you can also
query for.

Making 1 partition has implications for parallelism of your job.

Emre, I think I see what you're getting at but you have the map +
materialize pattern which i think doesn't have the right guarantees about
re-execution. Why not foreachRDD?

Yes you can also consider collecting the whole RDD in foreachRDD and doing
what you like, including writing to one file. But that would only work if
the data is always small in each RDD.

On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Jose,

 We've hit the same issue a couple of months ago. It is possible to write
 directly to files instead of creating directories, but it is not
 straightforward, and I haven't seen any clear demonstration in books,
 tutorials, etc.

 We do something like:

 SparkConf sparkConf = new SparkConf().setAppName(appName);
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
 Duration(batchInterval));
 JavaDStreamString stream = MyModuleApp.initializeJob(ssc);
 MyModuleApp.process(stream);

 And then in the process method:

 @Override public void process(JavaDStreamString inStream) {
 JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, 
 validatedJSONoutputDir, rejectedJSONoutputDir));
 forceOutput(json);  }


 This, in turn, calls the following (I've removed the irrelevant lines to 
 focus on writing):


 public class MyModuleWorker implements FunctionString,String {

   public String call(String json) {
 // process the data and then write it

 writeJSON(json, validatedJSONoutputDir_);  }}

 And the writeJSON method is:

 public static final void writeJSON(String json, String jsonDirPath) throws 
 IOException {String jsonFileName = jsonDirPath + / + 
 UUID.randomUUID().toString() + .json.tmp;URI uri = 
 URI.create(jsonFileName);Configuration conf = new Configuration();
 FileSystem fileSystem = FileSystem.get(uri, conf);FSDataOutputStream out 
 = fileSystem.create(new Path(uri));
 out.write(json.getBytes(StandardCharsets.UTF_8));out.close();
 fileSystem.rename(new Path(uri),new Path(URI.create(jsonDirPath + 
 / + UUID.randomUUID().toString() + .json)));  }


 Using a similar technique you might be able to achieve your objective.

 Kind regards,

 Emre Sevinç
 http://www.bigindustries.be/



 On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez jfernan...@sdl.com
 wrote:

  Hello folks,



 Our intended use case is:

 -  Spark Streaming app #1 reads from RabbitMQ and output to HDFS

 -  Spark Streaming app #2 reads #1’s output and stores the data
 into Elasticsearch



 The idea behind this architecture is that if Elasticsearch is down due to
 an upgrade or system error we don’t have to stop reading messages from the
 queue. We could also scale each process separately as needed.



 After a few hours research my understanding is that Spark Streaming
 outputs files in a *directory* for which you provide the prefix and suffix.
 This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting
 otherwise:



   /**

* Save each RDD in this DStream as a Sequence file of serialized
 objects.

* The file name at each batch interval is generated based on `prefix`
 and

* `suffix`: prefix-TIME_IN_MS.suffix.

*/



 Spark Streaming can monitor an HDFS directory for files but subfolders
 are not supported. So as far as I can tell, it is not possible to use Spark
 Streaming output as input for a different Spark Streaming app without
 somehow performing a separate operation in the middle.



 Am I missing something obvious? I’ve read some suggestions like using
 Hadoop to merge the directories (whose names I don’t see how you would
 know) and to reduce the partitions to 1 (which wouldn’t help).



 Any other suggestions? What is the expected pattern a developer would
 follow that would make Spark Streaming’s output format usable?





 www.sdl.com
 

Re: Why groupBy is slow?

2015-02-18 Thread shahab
Thanks  Francois for the comment and useful link. I understand the problem
better now.

best,
/Shahab

On Wed, Feb 18, 2015 at 10:36 AM, francois.garil...@typesafe.com wrote:

 In a nutshell : because it’s moving all of your data, compared to other
 operations (e.g. reduce) that summarize it in one form or another before
 moving it.

 For the longer answer:

 http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

 —
 FG


 On Wed, Feb 18, 2015 at 10:33 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 Based on what I could see in the Spark UI, I noticed that  groupBy
 transformation is quite slow (taking a lot of time) compared to other
 operations.

 Is there any reason that groupBy is slow?

 shahab





Re: Is spark streaming +MlLib for online learning?

2015-02-18 Thread mucaho
Hi

What is the general consensus/roadmap for implementing additional online /
streamed trainable models?

Apache Spark 1.2.1 currently supports streaming linear regression 
clustering, although other streaming linear methods are planned according to
the issue tracker.
However, I can not find any details on the issue tracker about online
training of a collaborative filter. Judging from  another mailing list
discussion
http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ce07aa61e-eeb9-4ded-be3e-3f04003e4...@storefront.be%3E
  
incremental training should be possible for ALS. Any plans for the future?

Regards
mucaho



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-spark-streaming-MlLib-for-online-learning-tp19701p21698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem with 1 master + 2 slaves cluster

2015-02-18 Thread Akhil Das
Since the cluster is standalone, you are better off reading/writing to hdfs
instead of local filesystem.

Thanks
Best Regards

On Wed, Feb 18, 2015 at 2:32 PM, bit1...@163.com bit1...@163.com wrote:

 But I am able to run the SparkPi example:
 ./run-example SparkPi 1000 --master spark://192.168.26.131:7077

 Result:Pi is roughly 3.14173708

 --
 bit1...@163.com


 *From:* bit1...@163.com
 *Date:* 2015-02-18 16:29
 *To:* user user@spark.apache.org
 *Subject:* Problem with 1 master + 2 slaves cluster
 Hi sparkers,
 I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then
 startup them, everything looks running normally.
 In the master node, I run the spark-shell, with the following steps:

 bin/spark-shell --master spark://192.168.26.131:7077
 scala var rdd =
 sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7)
 rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2,
 x._1)).sortByKey(false).map(x = (x._2,
 x._1)).saveAsTextFile(file:///home/hadoop/output)

 After finishing running the application, there is no word count related
 output, there does exist an output directory appear on each slave node,
  but there is only a _temporary subdirectory

 Any ideas? Thanks!

 --




Re: How to pass parameters to a spark-jobserver Scala class?

2015-02-18 Thread Vasu C
Hi Sasi,

Forgot to mention job server uses Typesafe Config library.  The input is
JSON, you can find syntax in below link

https://github.com/typesafehub/config



Regards,
   Vasu C



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21695.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does spark *always* fork its workers?

2015-02-18 Thread Sean Owen
Forked, meaning, different from the driver? Spark will in general not
even execute your tasks on the same machine as your driver. The driver can
choose to execute a task locally in some cases.

You are creating non-daemon threads in your function? your function can and
should clean up after itself. Just use try-finally to shut down your pool.
Or you can consider whether you can just make daemon threads. There's no
separate mechanism; you just write this into your function.

I assume you're looking at something like foreachPartitions and mean
'mapper' by way of analogy to MapReduce. This works then. But if you really
mean mapPartitions, beware that it is not an action and is lazily
evaluated.

Also consider not parallelizing manually -- is there really a need for
that? it's much simpler to let Spark manage it if possible.


On Wed, Feb 18, 2015 at 8:26 AM, Kevin Burton bur...@spinn3r.com wrote:

 I want to map over a Cassandra table in Spark but my code that executes
 needs a shutdown() call to return any threads, release file handles, etc.

 Will spark always execute my mappers as a forked process? And if so how do
 I handle threads preventing the JVM from terminating.

 It would be nice if there was a way to clean up after yourself gracefully
 in map jobs but I don’t think that exists right now.

 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




Re: Why groupBy is slow?

2015-02-18 Thread francois . garillot
In a nutshell : because it’s moving all of your data, compared to other 
operations (e.g. reduce) that summarize it in one form or another before moving 
it.




For the longer answer:

http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html



—
FG

On Wed, Feb 18, 2015 at 10:33 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,
 Based on what I could see in the Spark UI, I noticed that  groupBy
 transformation is quite slow (taking a lot of time) compared to other
 operations.
 Is there any reason that groupBy is slow?
 shahab

Re: Re: Problem with 1 master + 2 slaves cluster

2015-02-18 Thread bit1...@163.com
Sure, thanks Akhil. 
A further question : Is local file system(file:///) not supported in standalone 
cluster? 



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-18 17:35
To: bit1...@163.com
CC: user
Subject: Re: Problem with 1 master + 2 slaves cluster
Since the cluster is standalone, you are better off reading/writing to hdfs 
instead of local filesystem.

Thanks
Best Regards

On Wed, Feb 18, 2015 at 2:32 PM, bit1...@163.com bit1...@163.com wrote:
But I am able to run the SparkPi example:
./run-example SparkPi 1000 --master spark://192.168.26.131:7077

Result:Pi is roughly 3.14173708



bit1...@163.com
 
From: bit1...@163.com
Date: 2015-02-18 16:29
To: user
Subject: Problem with 1 master + 2 slaves cluster
Hi sparkers,
I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then startup 
them, everything looks running normally.
In the master node, I run the spark-shell, with the following steps:

bin/spark-shell --master spark://192.168.26.131:7077
scala var rdd = sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7)
rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2, 
x._1)).sortByKey(false).map(x = (x._2, 
x._1)).saveAsTextFile(file:///home/hadoop/output)

After finishing running the application, there is no word count related output, 
there does exist an output directory appear on each slave node,  but there is 
only a _temporary subdirectory

Any ideas? Thanks!







Why groupBy is slow?

2015-02-18 Thread shahab
Hi,

Based on what I could see in the Spark UI, I noticed that  groupBy
transformation is quite slow (taking a lot of time) compared to other
operations.

Is there any reason that groupBy is slow?

shahab


issue Running Spark Job on Yarn Cluster

2015-02-18 Thread sachin Singh
Hi,
I want to run my spark Job in Hadoop yarn Cluster mode,
I am using below command -
spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g
--executor-cores 1 --class com.dc.analysis.jobs.AggregationJob
sparkanalitic.jar param1 param2 param3
I am getting error as under, kindly suggest whats going wrong ,is command is
proper or not ,thanks in advance,

Exception in thread main org.apache.spark.SparkException: Application
finished with failed status
at
org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:35)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating RDDs from within foreachPartition() [Spark-Streaming]

2015-02-18 Thread t1ny
Hi all,

I am trying to create RDDs from within /rdd.foreachPartition()/ so I can
save these RDDs to ElasticSearch on the fly :

stream.foreachRDD(rdd = {
rdd.foreachPartition {
  iterator = {
val sc = rdd.context
iterator.foreach {
  case (cid, sid, ts) = {

[...]
  
sc.makeRDD(...).saveToEs(...) - *throws a
NullPointerException (sc is null)*
  }
}
  }
}
}

Unfortunately this doesn't work as I can't seem to be able to access the
SparkContext from anywhere within /foreachPartition()/. The code above
throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where
ssc is the StreamingContext object created in the main function, outside of
/foreachPartition/) then I get a NotSerializableException.

What is the correct way to do this ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Job Fails on sortByKey

2015-02-18 Thread athing goingon
hi, I have a job that fails on a shuffle during a sortByKey, on a
relatively small dataset. http://pastebin.com/raw.php?i=1LxiG4rY


Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Dmitry Goldenberg
Thanks, Cody. Yes, I originally started off by looking at that but I get a
compile error if I try and use that approach: constructor JdbcRDD in class
JdbcRDDT cannot be applied to given types.  Not to mention that
JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
argument).

Wonder if it's a JDK version issue, I'm using 1.7.

So I've got this, which doesn't compile

JdbcRDDRow jdbcRDD = new JdbcRDDRow(
new SparkContext(conf),
new JdbcRDD.ConnectionFactory() {
public Connection getConnection() throws SQLException {
Connection conn = null;
try {
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
} catch (ClassNotFoundException ex) {
throw new RuntimeException(Error while loading JDBC driver., ex);
}
return conn;
}
},
SELECT * FROM EMPLOYEES,
0L,
1000L,
10,
new FunctionResultSet, Row() {
public Row call(ResultSet r) throws Exception {
return null; // have some actual logic here...
}
},
scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));

The other approach was mimicing the DbConnection class from this post:
http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
It got around any of the compilation issues but then I got the runtime
error where Spark wouldn't recognize the db connection class as a
scala.Function0.



On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote:

 Take a look at


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



 On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I'm reading data from a database using JdbcRDD, in Java, and I have an
 implementation of Function0Connection whose instance I supply as the
 'getConnection' parameter into the JdbcRDD constructor. Compiles fine.

 The definition of the class/function is as follows:

   public class GetDbConnection extends AbstractFunction0Connection
 implements Serializable

 where scala.runtime.AbstractFunction0 extends scala.Function0.

 At runtime, I get an exception as below. Does anyone have an idea as to
 how
 to resolve this/work around it? Thanks.

 I'm running Spark 1.2.1 built for Hadoop 2.4.


 Exception in thread main org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure:
 Lost task 3.0 in stage 0.0 (TID 3, localhost):
 java.lang.ClassCastException:
 cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to
 field
 org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
 of
 type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
 at

 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
 at
 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 at

 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at

 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at

 

Re: Job Fails on sortByKey

2015-02-18 Thread Saisai Shao
Would you mind explaining your problem a little more specifically, like
exceptions you met or others, so someone who has experiences on it could
give advice.

Thanks
Jerry

2015-02-19 1:08 GMT+08:00 athing goingon athinggoin...@gmail.com:

 hi, I have a job that fails on a shuffle during a sortByKey, on a
 relatively small dataset. http://pastebin.com/raw.php?i=1LxiG4rY



Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
Take a look at

https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 I'm reading data from a database using JdbcRDD, in Java, and I have an
 implementation of Function0Connection whose instance I supply as the
 'getConnection' parameter into the JdbcRDD constructor. Compiles fine.

 The definition of the class/function is as follows:

   public class GetDbConnection extends AbstractFunction0Connection
 implements Serializable

 where scala.runtime.AbstractFunction0 extends scala.Function0.

 At runtime, I get an exception as below. Does anyone have an idea as to how
 to resolve this/work around it? Thanks.

 I'm running Spark 1.2.1 built for Hadoop 2.4.


 Exception in thread main org.apache.spark.SparkException: Job aborted due
 to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure:
 Lost task 3.0 in stage 0.0 (TID 3, localhost):
 java.lang.ClassCastException:
 cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to
 field
 org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
 of
 type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
 at

 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
 at
 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 at

 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)





 --
 View this message in context:
 

Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
Is sc there a SparkContext or a JavaSparkContext?  The compilation error
seems to indicate the former, but JdbcRDD.create expects the latter

On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg 
dgoldenberg...@gmail.com wrote:

 I have tried that as well, I get a compile error --

 [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for
 create(SparkContext,anonymous
 ConnectionFactory,String,int,int,int,anonymous
 FunctionResultSet,Integer)

 The code is a copy and paste:

 JavaRDDInteger jdbcRDD = JdbcRDD.create(
   sc,
   new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
   return
 DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
 }
   },
   SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
   1, 100, 1,
   new FunctionResultSet, Integer() {
 public Integer call(ResultSet r) throws Exception {
   return r.getInt(1);
 }
   }
 );

 The other thing I've tried was to define a static class locally for
 GetConnection and use the JdbcCreate constructor. This got around the
 compile issues but blew up at runtime with NoClassDefFoundError:
 scala/runtime/AbstractFunction0 !

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 sc,
 (AbstractFunction0Connection) new DbConn(), // had to cast or a compile
 error
 SQL_QUERY,
 0L,
 1000L,
 10,
 new MapRow(),
 ROW_CLASS_TAG);
 // DbConn is defined as public static class DbConn extends
 AbstractFunction0Connection implements Serializable

 On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That test I linked


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90

 is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that what
 you tried doing?

 On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody. Yes, I originally started off by looking at that but I get
 a compile error if I try and use that approach: constructor JdbcRDD in
 class JdbcRDDT cannot be applied to given types.  Not to mention that
 JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
 argument).

 Wonder if it's a JDK version issue, I'm using 1.7.

 So I've got this, which doesn't compile

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 new SparkContext(conf),
 new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
 Connection conn = null;
 try {
 Class.forName(JDBC_DRIVER);
 conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
 } catch (ClassNotFoundException ex) {
 throw new RuntimeException(Error while loading JDBC driver., ex);
 }
 return conn;
 }
 },
 SELECT * FROM EMPLOYEES,
 0L,
 1000L,
 10,
 new FunctionResultSet, Row() {
 public Row call(ResultSet r) throws Exception {
 return null; // have some actual logic here...
 }
 },
 scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));

 The other approach was mimicing the DbConnection class from this post:
 http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
 It got around any of the compilation issues but then I got the runtime
 error where Spark wouldn't recognize the db connection class as a
 scala.Function0.



 On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Take a look at


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



 On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com
  wrote:

 I'm reading data from a database using JdbcRDD, in Java, and I have an
 implementation of Function0Connection whose instance I supply as the
 'getConnection' parameter into the JdbcRDD constructor. Compiles fine.

 The definition of the class/function is as follows:

   public class GetDbConnection extends AbstractFunction0Connection
 implements Serializable

 where scala.runtime.AbstractFunction0 extends scala.Function0.

 At runtime, I get an exception as below. Does anyone have an idea as
 to how
 to resolve this/work around it? Thanks.

 I'm running Spark 1.2.1 built for Hadoop 2.4.


 Exception in thread main org.apache.spark.SparkException: Job
 aborted due
 to stage failure: Task 3 in stage 0.0 failed 1 times, most recent
 failure:
 Lost task 3.0 in stage 0.0 (TID 3, localhost):
 java.lang.ClassCastException:
 cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection
 to
 field
 org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
 of
 type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
 at

 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
 at

 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
 at

 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
   

build spark for cdh5

2015-02-18 Thread Koert Kuipers
does anyone have the right maven invocation for cdh5 with yarn?
i tried:
$ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn -DskipTests clean
package
$ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn test

it builds and passes tests just fine, but when i deploy on cluster and i
try to run SparkPi i get:
Caused by: java.lang.VerifyError: class
org.apache.hadoop.yarn.proto.YarnServiceProtos$GetApplicationReportRequestProto
overrides final method getUnknownFields.()Lcom/google/p\
rotobuf/UnknownFieldSet;

so clearly i am doing something wrong. something with protobuf 2.4 versus
2.5

i do not want to use the cloudera version of spark for cdh 5 (it includes
the wrong akka version for me)

thanks


Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Marcelo Vanzin
Hello,

On Tue, Feb 17, 2015 at 8:53 PM, dgoldenberg dgoldenberg...@gmail.com wrote:
 I've tried setting spark.files.userClassPathFirst to true in SparkConf in my
 program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf as

Is the code in question running on the driver or in some executor?
spark.files.userClassPathFirst only applies to executors. To override
classes in the driver's classpath, you need to modify
spark.driver.extraClassPath (or --driver-class-path in spark-submit's
command line).

In 1.3 there's an option similar to spark.files.userClassPathFirst
that works for the driver too.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: build spark for cdh5

2015-02-18 Thread Koert Kuipers
thanks! my bad

On Wed, Feb 18, 2015 at 2:00 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Koert,

 You should be using -Phadoop-2.3 instead of -Phadoop2.3.

 -Sandy

 On Wed, Feb 18, 2015 at 10:51 AM, Koert Kuipers ko...@tresata.com wrote:

 does anyone have the right maven invocation for cdh5 with yarn?
 i tried:
 $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn -DskipTests
 clean package
 $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn test

 it builds and passes tests just fine, but when i deploy on cluster and i
 try to run SparkPi i get:
 Caused by: java.lang.VerifyError: class
 org.apache.hadoop.yarn.proto.YarnServiceProtos$GetApplicationReportRequestProto
 overrides final method getUnknownFields.()Lcom/google/p\
 rotobuf/UnknownFieldSet;

 so clearly i am doing something wrong. something with protobuf 2.4 versus
 2.5

 i do not want to use the cloudera version of spark for cdh 5 (it includes
 the wrong akka version for me)

 thanks





Re: Tableau beta connector

2015-02-18 Thread ganterm
Ashutosh,

Were you able to figure this out? I am having the exact some question. 
I think the answer is to use Spark SQL to create/load a table in Hive (e.g.
execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping for
something more simple than that. 

Anybody? 

Thanks!  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Juan Rodríguez Hortalá
Hi Sean,

Thanks a lot for your answer. That explains it, as I was creating thousands
of RDDs, so I guess the communication overhead was the reason why the Spark
job was freezing. After changing the code to use RDDs of pairs and
aggregateByKey it works just fine, and quite fast.

Again, thanks a lot for your help.

Greetings,

Juan

2015-02-18 15:35 GMT+01:00 Sean Owen so...@cloudera.com:

 At some level, enough RDDs creates hundreds of thousands of tiny
 partitions of data each of which creates a task for each stage. The
 raw overhead of all the message passing can slow things down a lot. I
 would not design something to use an RDD per key. You would generally
 use key by some value you want to divide and filter on, and then use a
 *ByKey to do your work.

 You don't work with individual RDDs this way, but usually that's good
 news. You usually have a lot more flexibility operating just in pure
 Java / Scala to do whatever you need inside your function.

 On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá
 juan.rodriguez.hort...@gmail.com wrote:
  Hi Paweł,
 
  Thanks a lot for your answer. I finally got the program to work by using
  aggregateByKey, but I was wondering why creating thousands of RDDs
 doesn't
  work. I think that could be interesting for using methods that work on
 RDDs
  like for example JavaDoubleRDD.stats() (
 
 http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29
 ).
  If the groups are small then I can chain groupBy(), collect(),
 parallelize()
  and stats(), but that is quite inefficient because it implies moving
 data to
  and from the driver, and also doesn't scale to big groups; on the other
 hand
  if I use aggregateByKey or a similar function then I cannot use stats()
 so I
  have to reimplement it. In general I was looking for a way to reuse other
  functions that I have that work on RDDs, for using them on groups of
 data in
  a RDD, because I don't see a how to directly apply them to each of the
  groups in a pair RDD.
 
  Again, thanks a lot for your answer,
 
  Greetings,
 
  Juan Rodriguez
 
 
 
 
  2015-02-18 14:56 GMT+01:00 Paweł Szulc paul.sz...@gmail.com:
 
  Maybe you can omit using grouping all together with groupByKey? What is
  your next step after grouping elements by key? Are you trying to reduce
  values? If so then I would recommend using some reducing functions like
 for
  example reduceByKey or aggregateByKey. Those will first reduce value for
  each key locally on each node before doing actual IO over the network.
 There
  will also be no grouping phase so you will not run into memory issues.
 
  Please let me know if that helped
 
  Pawel Szulc
  @rabbitonweb
  http://www.rabbitonweb.com
 
 
  On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá
  juan.rodriguez.hort...@gmail.com wrote:
 
  Hi,
 
  I'm writing a Spark program where I want to divide a RDD into different
  groups, but the groups are too big to use groupByKey. To cope with
 that,
  since I know in advance the list of keys for each group, I build a map
 from
  the keys to the RDDs that result from filtering the input RDD to get
 the
  records for the corresponding key. This works when I have a small
 number of
  keys, but for big number of keys (tens of thousands) the execution gets
  stuck, without issuing any new Spark stage. I suspect the reason is
 that the
  Spark scheduler is not able to handle so many RDDs. Does it make
 sense? I'm
  rewriting the program to use a single RDD of pairs, with cached
 partions,
  but I wanted to be sure I understand the problem here.
 
  Thanks a lot in advance,
 
  Greetings,
 
  Juan Rodriguez
 
 
 



Spark data incorrect when more than 200 tasks

2015-02-18 Thread lbierman
I'm fairly new to Spark. 
We have data in avro files on hdfs.
We are trying to load up all the avro files (28 gigs worth right now) and do
an aggregation.

When we have less than 200 tasks the data all runs and produces the proper
results. If there are more than 200 tasks (as stated in the logs by the
TaskSetManager) the data seems to only group when it reads in the RDD from
hdfs by the first record in the avro file. 

If I set: spark.shuffle.sort.bypassMergeThreshold greater than 200 data
seems to work. I don't understand why or how?

Here is the relevant code pieces:
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setAppName(AnalyticsJob.class.getSimpleName())
.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
);


context.hadoopConfiguration().set(mapreduce.input.fileinputformat.input.dir.recursive,
true);

context.hadoopConfiguration().set(mapreduce.input.fileinputformat.inputdir,
job.inputDirectory);

JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
context.newAPIHadoopRDD(
context.hadoopConfiguration(),
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class
).keys())
.map(event - event.datum())
.filter(key - { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })
.mapToPair(event - new Tuple2AnalyticsEvent, Integer(event, 1))
.groupByKey()
.map(tuple - tuple._1());

events.persist(StorageLevel.MEMORY_AND_DISK_2());

If I do a collect on events at this point the data is not as expected and
jumbled, so when we pass it onto the next job in our pipeline for
aggregation, the data doesn't come out as expected.

The downstream tasks maps to pairs again and stores in the db.

Thanks in advance for this help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-data-incorrect-when-more-than-200-tasks-tp21710.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
That test I linked

https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90

is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that what
you tried doing?

On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg 
dgoldenberg...@gmail.com wrote:

 Thanks, Cody. Yes, I originally started off by looking at that but I get a
 compile error if I try and use that approach: constructor JdbcRDD in class
 JdbcRDDT cannot be applied to given types.  Not to mention that
 JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
 argument).

 Wonder if it's a JDK version issue, I'm using 1.7.

 So I've got this, which doesn't compile

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 new SparkContext(conf),
 new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
 Connection conn = null;
 try {
 Class.forName(JDBC_DRIVER);
 conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
 } catch (ClassNotFoundException ex) {
 throw new RuntimeException(Error while loading JDBC driver., ex);
 }
 return conn;
 }
 },
 SELECT * FROM EMPLOYEES,
 0L,
 1000L,
 10,
 new FunctionResultSet, Row() {
 public Row call(ResultSet r) throws Exception {
 return null; // have some actual logic here...
 }
 },
 scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));

 The other approach was mimicing the DbConnection class from this post:
 http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
 It got around any of the compilation issues but then I got the runtime
 error where Spark wouldn't recognize the db connection class as a
 scala.Function0.



 On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Take a look at


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



 On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I'm reading data from a database using JdbcRDD, in Java, and I have an
 implementation of Function0Connection whose instance I supply as the
 'getConnection' parameter into the JdbcRDD constructor. Compiles fine.

 The definition of the class/function is as follows:

   public class GetDbConnection extends AbstractFunction0Connection
 implements Serializable

 where scala.runtime.AbstractFunction0 extends scala.Function0.

 At runtime, I get an exception as below. Does anyone have an idea as to
 how
 to resolve this/work around it? Thanks.

 I'm running Spark 1.2.1 built for Hadoop 2.4.


 Exception in thread main org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 3 in stage 0.0 failed 1 times, most recent
 failure:
 Lost task 3.0 in stage 0.0 (TID 3, localhost):
 java.lang.ClassCastException:
 cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to
 field
 org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
 of
 type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
 at

 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
 at
 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 at

 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 

Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Dmitry Goldenberg
I have tried that as well, I get a compile error --

[ERROR] ...SparkProto.java:[105,39] error: no suitable method found for
create(SparkContext,anonymous
ConnectionFactory,String,int,int,int,anonymous
FunctionResultSet,Integer)

The code is a copy and paste:

JavaRDDInteger jdbcRDD = JdbcRDD.create(
  sc,
  new JdbcRDD.ConnectionFactory() {
public Connection getConnection() throws SQLException {
  return
DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
}
  },
  SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
  1, 100, 1,
  new FunctionResultSet, Integer() {
public Integer call(ResultSet r) throws Exception {
  return r.getInt(1);
}
  }
);

The other thing I've tried was to define a static class locally for
GetConnection and use the JdbcCreate constructor. This got around the
compile issues but blew up at runtime with NoClassDefFoundError:
scala/runtime/AbstractFunction0 !

JdbcRDDRow jdbcRDD = new JdbcRDDRow(
sc,
(AbstractFunction0Connection) new DbConn(), // had to cast or a compile
error
SQL_QUERY,
0L,
1000L,
10,
new MapRow(),
ROW_CLASS_TAG);
// DbConn is defined as public static class DbConn extends
AbstractFunction0Connection implements Serializable

On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote:

 That test I linked


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90

 is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that what
 you tried doing?

 On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody. Yes, I originally started off by looking at that but I get
 a compile error if I try and use that approach: constructor JdbcRDD in
 class JdbcRDDT cannot be applied to given types.  Not to mention that
 JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
 argument).

 Wonder if it's a JDK version issue, I'm using 1.7.

 So I've got this, which doesn't compile

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 new SparkContext(conf),
 new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
 Connection conn = null;
 try {
 Class.forName(JDBC_DRIVER);
 conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
 } catch (ClassNotFoundException ex) {
 throw new RuntimeException(Error while loading JDBC driver., ex);
 }
 return conn;
 }
 },
 SELECT * FROM EMPLOYEES,
 0L,
 1000L,
 10,
 new FunctionResultSet, Row() {
 public Row call(ResultSet r) throws Exception {
 return null; // have some actual logic here...
 }
 },
 scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));

 The other approach was mimicing the DbConnection class from this post:
 http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
 It got around any of the compilation issues but then I got the runtime
 error where Spark wouldn't recognize the db connection class as a
 scala.Function0.



 On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Take a look at


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



 On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I'm reading data from a database using JdbcRDD, in Java, and I have an
 implementation of Function0Connection whose instance I supply as the
 'getConnection' parameter into the JdbcRDD constructor. Compiles fine.

 The definition of the class/function is as follows:

   public class GetDbConnection extends AbstractFunction0Connection
 implements Serializable

 where scala.runtime.AbstractFunction0 extends scala.Function0.

 At runtime, I get an exception as below. Does anyone have an idea as to
 how
 to resolve this/work around it? Thanks.

 I'm running Spark 1.2.1 built for Hadoop 2.4.


 Exception in thread main org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 3 in stage 0.0 failed 1 times, most recent
 failure:
 Lost task 3.0 in stage 0.0 (TID 3, localhost):
 java.lang.ClassCastException:
 cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection
 to
 field
 org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
 of
 type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
 at

 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
 at
 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at

 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 

Spark and Spark Streaming code sharing best practice.

2015-02-18 Thread Jean-Pascal Billaud
Hey,

It seems pretty clear that one of the strength of Spark is to be able to
share your code between your batch and streaming layer. Though, given that
Spark streaming uses DStream being a set of RDDs and Spark uses a single
RDD there might some complexity associated with it.

Of course since DStream is a superset of RDDs, one can just run the same
code at the RDD granularity using DStream::forEachRDD. While this should
work for map, I am not sure how that can work when it comes to reduce phase
given that a group of keys spans across multiple RDDs.

One of the option is to change the dataset object on which a job works on.
For example of passing an RDD to a class method, one passes a higher level
object (MetaRDD) that wraps around RDD or DStream depending the context. At
this point the job calls its regular maps, reduces and so on and the
MetaRDD wrapper would delegate accordingly.

Just would like to know the official best practice from the spark community
though.

Thanks,


Re: Spark and Spark Streaming code sharing best practice.

2015-02-18 Thread Arush Kharbanda
I find monoids pretty useful in this respect, basically separating out the
logic in a monoid and then applying the logic to either a stream or a
batch. A list of such practices could be really useful.

On Thu, Feb 19, 2015 at 12:26 AM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 Hey,

 It seems pretty clear that one of the strength of Spark is to be able to
 share your code between your batch and streaming layer. Though, given that
 Spark streaming uses DStream being a set of RDDs and Spark uses a single
 RDD there might some complexity associated with it.

 Of course since DStream is a superset of RDDs, one can just run the same
 code at the RDD granularity using DStream::forEachRDD. While this should
 work for map, I am not sure how that can work when it comes to reduce phase
 given that a group of keys spans across multiple RDDs.

 One of the option is to change the dataset object on which a job works on.
 For example of passing an RDD to a class method, one passes a higher level
 object (MetaRDD) that wraps around RDD or DStream depending the context. At
 this point the job calls its regular maps, reduces and so on and the
 MetaRDD wrapper would delegate accordingly.

 Just would like to know the official best practice from the spark
 community though.

 Thanks,




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Hamburg Apache Spark Meetup

2015-02-18 Thread Johan Beisser
If you could also add the Hamburg Apache Spark Meetup, I'd appreciate it.

http://www.meetup.com/Hamburg-Apache-Spark-Meetup/

On Tue, Feb 17, 2015 at 5:08 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Thanks! I've added you.

 Matei

 On Feb 17, 2015, at 4:06 PM, Ralph Bergmann | the4thFloor.eu 
 ra...@the4thfloor.eu wrote:

 Hi,


 there is a small Spark Meetup group in Berlin, Germany :-)
 http://www.meetup.com/Berlin-Apache-Spark-Meetup/

 Plaes add this group to the Meetups list at
 https://spark.apache.org/community.html


 Ralph

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: build spark for cdh5

2015-02-18 Thread Sandy Ryza
Hi Koert,

You should be using -Phadoop-2.3 instead of -Phadoop2.3.

-Sandy

On Wed, Feb 18, 2015 at 10:51 AM, Koert Kuipers ko...@tresata.com wrote:

 does anyone have the right maven invocation for cdh5 with yarn?
 i tried:
 $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn -DskipTests clean
 package
 $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn test

 it builds and passes tests just fine, but when i deploy on cluster and i
 try to run SparkPi i get:
 Caused by: java.lang.VerifyError: class
 org.apache.hadoop.yarn.proto.YarnServiceProtos$GetApplicationReportRequestProto
 overrides final method getUnknownFields.()Lcom/google/p\
 rotobuf/UnknownFieldSet;

 so clearly i am doing something wrong. something with protobuf 2.4 versus
 2.5

 i do not want to use the cloudera version of spark for cdh 5 (it includes
 the wrong akka version for me)

 thanks



ML Transformer

2015-02-18 Thread Cesar Flores
I am working right now with the ML pipeline, which I really like it.
However in order to make a real use of it, I would like create my own
transformers that implements org.apache.spark.ml.Transformer. In order to
do that, a method from the PipelineStage needs to be implemented. But this
method is private to the ml package:

private[ml] def transformSchema(schema: StructType, paramMap: ParamMap):
StructType

Do any user can create their own transformers? If not, do this
functionality will be added in the future.

Thanks
-- 
Cesar Flores


NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr

2015-02-18 Thread dgoldenberg
I'm using Solrj in a Spark program. When I try to send the docs to Solr, I
get the NotSerializableException on the DefaultHttpClient.  Is there a
possible fix or workaround?

I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0.

final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL);
...
JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow,
SolrInputDocument() {
public SolrInputDocument call(Row r) {
return r.toSolrDocument();
}
});

solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument()
{
public void call(IteratorSolrInputDocument solrDocIterator) throws
Exception {
ListSolrInputDocument batch = new 
ArrayListSolrInputDocument();

while (solrDocIterator.hasNext()) {
SolrInputDocument inputDoc = solrDocIterator.next();
batch.add(inputDoc);
if (batch.size() = batchSize) {
Utils.sendBatchToSolr(solrServer, 
solrCollection, batch);
}
}
if (!batch.isEmpty()) {
Utils.sendBatchToSolr(solrServer, solrCollection, 
batch);
}
}
});



Exception in thread main org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195)
at
org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32)
at
com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158)
at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.http.impl.client.DefaultHttpClient
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-org-apache-http-impl-client-DefaultHttpClient-when-trying-to-send-documentsr-tp21713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JsonRDD to parquet -- data loss

2015-02-18 Thread Michael Armbrust
Concurrent inserts into the same table are not supported.  I can try to
make this clearer in the documentation.

On Tue, Feb 17, 2015 at 8:01 PM, Vasu C vasuc.bigd...@gmail.com wrote:

 Hi,

 I am running spark batch processing job using spark-submit command. And
 below is my code snippet.  Basically converting JsonRDD to parquet and
 storing it in HDFS location.

 The problem I am facing is if multiple jobs are are triggered parallely,
 even though job executes properly (as i can see in spark webUI), there is
 no parquet file created in hdfs path. If 5 jobs are executed parallely than
 only 3 parquet files are getting created.

 Is this the data loss scenario ? Or am I missing something here. Please
 help me in this

 Here tableName is unique with timestamp appended to it.


 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 val jsonRdd  = sqlContext.jsonRDD(results)

 val parquetTable = sqlContext.parquetFile(parquetFilePath)

 parquetTable.registerTempTable(tableName)

 jsonRdd.insertInto(tableName)


 Regards,

   Vasu C



Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Dmitry Goldenberg
I'm not sure what on the driver means but I've tried
setting  spark.files.userClassPathFirst to true,
in $SPARK-HOME/conf/spark-defaults.conf and also in the SparkConf
programmatically; it appears to be ignored. The solution was to follow
Emre's recommendation and downgrade the selected Solrj distro to 4.0.0.
That did the trick as it appears to be using the same HttpClient as one
used by Spark/Hadoop.

The Spark program I'm running is a jar I submit via a spark-submit
invokation.



On Wed, Feb 18, 2015 at 1:57 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hello,

 On Tue, Feb 17, 2015 at 8:53 PM, dgoldenberg dgoldenberg...@gmail.com
 wrote:
  I've tried setting spark.files.userClassPathFirst to true in SparkConf
 in my
  program, also setting it to true in
 $SPARK-HOME/conf/spark-defaults.conf as

 Is the code in question running on the driver or in some executor?
 spark.files.userClassPathFirst only applies to executors. To override
 classes in the driver's classpath, you need to modify
 spark.driver.extraClassPath (or --driver-class-path in spark-submit's
 command line).

 In 1.3 there's an option similar to spark.files.userClassPathFirst
 that works for the driver too.

 --
 Marcelo



Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Dmitry Goldenberg
Cody, you were right, I had a copy and paste snag where I ended up with a
vanilla SparkContext rather than a Java one.  I also had to *not* use my
function subclasses, rather just use anonymous inner classes for the
Function stuff and that got things working. I'm fully following
the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.

Is there a clean way to refactor out the custom Function classes such as
the one for getting a db connection or mapping ResultSet data to your own
POJO's rather than doing it all inline?


On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote:

 Is sc there a SparkContext or a JavaSparkContext?  The compilation error
 seems to indicate the former, but JdbcRDD.create expects the latter

 On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 I have tried that as well, I get a compile error --

 [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for
 create(SparkContext,anonymous
 ConnectionFactory,String,int,int,int,anonymous
 FunctionResultSet,Integer)

 The code is a copy and paste:

 JavaRDDInteger jdbcRDD = JdbcRDD.create(
   sc,
   new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
   return
 DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
 }
   },
   SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
   1, 100, 1,
   new FunctionResultSet, Integer() {
 public Integer call(ResultSet r) throws Exception {
   return r.getInt(1);
 }
   }
 );

 The other thing I've tried was to define a static class locally for
 GetConnection and use the JdbcCreate constructor. This got around the
 compile issues but blew up at runtime with NoClassDefFoundError:
 scala/runtime/AbstractFunction0 !

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 sc,
 (AbstractFunction0Connection) new DbConn(), // had to cast or a compile
 error
 SQL_QUERY,
 0L,
 1000L,
 10,
 new MapRow(),
 ROW_CLASS_TAG);
 // DbConn is defined as public static class DbConn extends
 AbstractFunction0Connection implements Serializable

 On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That test I linked


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90

 is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that
 what you tried doing?

 On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody. Yes, I originally started off by looking at that but I
 get a compile error if I try and use that approach: constructor JdbcRDD in
 class JdbcRDDT cannot be applied to given types.  Not to mention that
 JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
 argument).

 Wonder if it's a JDK version issue, I'm using 1.7.

 So I've got this, which doesn't compile

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 new SparkContext(conf),
 new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
 Connection conn = null;
 try {
 Class.forName(JDBC_DRIVER);
 conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
 } catch (ClassNotFoundException ex) {
 throw new RuntimeException(Error while loading JDBC driver., ex);
 }
 return conn;
 }
 },
 SELECT * FROM EMPLOYEES,
 0L,
 1000L,
 10,
 new FunctionResultSet, Row() {
 public Row call(ResultSet r) throws Exception {
 return null; // have some actual logic here...
 }
 },
 scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));

 The other approach was mimicing the DbConnection class from this post:
 http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
 It got around any of the compilation issues but then I got the runtime
 error where Spark wouldn't recognize the db connection class as a
 scala.Function0.



 On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Take a look at


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



 On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg 
 dgoldenberg...@gmail.com wrote:

 I'm reading data from a database using JdbcRDD, in Java, and I have an
 implementation of Function0Connection whose instance I supply as the
 'getConnection' parameter into the JdbcRDD constructor. Compiles fine.

 The definition of the class/function is as follows:

   public class GetDbConnection extends AbstractFunction0Connection
 implements Serializable

 where scala.runtime.AbstractFunction0 extends scala.Function0.

 At runtime, I get an exception as below. Does anyone have an idea as
 to how
 to resolve this/work around it? Thanks.

 I'm running Spark 1.2.1 built for Hadoop 2.4.


 Exception in thread main org.apache.spark.SparkException: Job
 aborted due
 to stage failure: Task 3 in stage 0.0 

Spark can't pickle class: error cannot lookup attribute

2015-02-18 Thread Guillaume Guy
Hi,

This is a duplicate of the stack-overflow question here
http://stackoverflow.com/questions/28569374/spark-returning-pickle-error-cannot-lookup-attribute.
I hope to generate more interest  on this mailing list.


*The problem:*

I am running into some attribute lookup problems when trying to initiate a
class within my RDD.

My workflow is quite standard:

1- Start with an RDD

2- Take each element of the RDD, initiate an object for each

3- Reduce (I will write a method that will define the reduce operation
later on)

*Here is #2:*

*class test(object):*
*def __init__(self, a,b):*
*self.total = a + b*

*a = sc.parallelize([(True,False),(False,False)])*
*a.map(lambda (x,y): test(x,y))*

Here is the error I get:

PicklingError: Can't pickle  class 'main.test' : attribute lookup
main.test failed

I'd like to know if there is any way around it. Please, answer with a
working example to achieve the intended results (i.e. creating a RDD of
objects of class tests).

Thanks in advance!

*Related question:*

   - https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn


GG


Thriftserver Beeline

2015-02-18 Thread gtinside
Hi ,

I created some hive tables and trying to list them through Beeline , but not
getting any results. I can list the tables through spark-sql. 

When I connect beeline, it starts up with following message :
Connecting to jdbc:hive2://tst001:10001
Enter username for jdbc:hive2://tst001:10001: 
Enter password for jdbc:hive2://tst001:10001: 
Connected to: Spark SQL (version 1.2.0)
Driver: null (version null)
Transaction isolation: TRANSACTION_REPEATABLE_READ

show tables on beeline, return no results. 

Regards,
Gaurav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Thriftserver-Beeline-tp21712.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Cody Koeninger
Cant you implement the

org.apache.spark.api.java.function.Function

interface and pass an instance of that to JdbcRDD.create ?

On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Cody, you were right, I had a copy and paste snag where I ended up with a
 vanilla SparkContext rather than a Java one.  I also had to *not* use my
 function subclasses, rather just use anonymous inner classes for the
 Function stuff and that got things working. I'm fully following
 the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.

 Is there a clean way to refactor out the custom Function classes such as
 the one for getting a db connection or mapping ResultSet data to your own
 POJO's rather than doing it all inline?


 On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Is sc there a SparkContext or a JavaSparkContext?  The compilation error
 seems to indicate the former, but JdbcRDD.create expects the latter

 On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 I have tried that as well, I get a compile error --

 [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for
 create(SparkContext,anonymous
 ConnectionFactory,String,int,int,int,anonymous
 FunctionResultSet,Integer)

 The code is a copy and paste:

 JavaRDDInteger jdbcRDD = JdbcRDD.create(
   sc,
   new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
   return
 DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
 }
   },
   SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
   1, 100, 1,
   new FunctionResultSet, Integer() {
 public Integer call(ResultSet r) throws Exception {
   return r.getInt(1);
 }
   }
 );

 The other thing I've tried was to define a static class locally for
 GetConnection and use the JdbcCreate constructor. This got around the
 compile issues but blew up at runtime with NoClassDefFoundError:
 scala/runtime/AbstractFunction0 !

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 sc,
 (AbstractFunction0Connection) new DbConn(), // had to cast or a
 compile error
 SQL_QUERY,
 0L,
 1000L,
 10,
 new MapRow(),
 ROW_CLASS_TAG);
 // DbConn is defined as public static class DbConn extends
 AbstractFunction0Connection implements Serializable

 On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That test I linked


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90

 is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that
 what you tried doing?

 On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody. Yes, I originally started off by looking at that but I
 get a compile error if I try and use that approach: constructor JdbcRDD in
 class JdbcRDDT cannot be applied to given types.  Not to mention that
 JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
 argument).

 Wonder if it's a JDK version issue, I'm using 1.7.

 So I've got this, which doesn't compile

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 new SparkContext(conf),
 new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
 Connection conn = null;
 try {
 Class.forName(JDBC_DRIVER);
 conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
 } catch (ClassNotFoundException ex) {
 throw new RuntimeException(Error while loading JDBC driver., ex);
 }
 return conn;
 }
 },
 SELECT * FROM EMPLOYEES,
 0L,
 1000L,
 10,
 new FunctionResultSet, Row() {
 public Row call(ResultSet r) throws Exception {
 return null; // have some actual logic here...
 }
 },
 scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));

 The other approach was mimicing the DbConnection class from this post:
 http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
 It got around any of the compilation issues but then I got the runtime
 error where Spark wouldn't recognize the db connection class as a
 scala.Function0.



 On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Take a look at


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



 On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg 
 dgoldenberg...@gmail.com wrote:

 I'm reading data from a database using JdbcRDD, in Java, and I have
 an
 implementation of Function0Connection whose instance I supply as
 the
 'getConnection' parameter into the JdbcRDD constructor. Compiles
 fine.

 The definition of the class/function is as follows:

   public class GetDbConnection extends AbstractFunction0Connection
 implements Serializable

 where scala.runtime.AbstractFunction0 extends scala.Function0.

 At runtime, I get an exception as below. Does anyone have 

spark slave cannot execute without admin permission on windows

2015-02-18 Thread Judy Nash
Hi,

Is it possible to configure spark to run without admin permission on windows?

My current setup run master  slave successfully with admin permission.
However, if I downgrade permission level from admin to user, SparkPi fails with 
the following exception on the slave node:
Exception in thread main org.apache.spark.SparkException: Job aborted due to s
tage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task
0.3 in stage 0.0 (TID 9, workernode0.jnashsparkcurr2.d10.internal.cloudapp.net)
: java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi$$anonfun$1

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)

Upon investigation, it appears that sparkPi jar under 
spark_home\worker\appname\*.jar does not have execute permission set, causing 
spark not able to find class.

Advice would be very much appreciated.

Thanks,
Judy



Re: Spark and Spark Streaming code sharing best practice.

2015-02-18 Thread Arush Kharbanda
Monoids are useful in Aggregations and try avoiding Anonymous functions,
creating out functions out of the spark code allows the functions to be
reused(Possibly between Spark and Spark Streaming)

On Thu, Feb 19, 2015 at 6:56 AM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 Thanks Arush. I will check that out.

 On Wed, Feb 18, 2015 at 11:06 AM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 I find monoids pretty useful in this respect, basically separating out
 the logic in a monoid and then applying the logic to either a stream or a
 batch. A list of such practices could be really useful.

 On Thu, Feb 19, 2015 at 12:26 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey,

 It seems pretty clear that one of the strength of Spark is to be able to
 share your code between your batch and streaming layer. Though, given that
 Spark streaming uses DStream being a set of RDDs and Spark uses a single
 RDD there might some complexity associated with it.

 Of course since DStream is a superset of RDDs, one can just run the same
 code at the RDD granularity using DStream::forEachRDD. While this should
 work for map, I am not sure how that can work when it comes to reduce phase
 given that a group of keys spans across multiple RDDs.

 One of the option is to change the dataset object on which a job works
 on. For example of passing an RDD to a class method, one passes a higher
 level object (MetaRDD) that wraps around RDD or DStream depending the
 context. At this point the job calls its regular maps, reduces and so on
 and the MetaRDD wrapper would delegate accordingly.

 Just would like to know the official best practice from the spark
 community though.

 Thanks,




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: spark slave cannot execute without admin permission on windows

2015-02-18 Thread Akhil Das
You need not require admin permission, but just make sure all those jars
has execute permission ( read/write access)

Thanks
Best Regards

On Thu, Feb 19, 2015 at 11:30 AM, Judy Nash judyn...@exchange.microsoft.com
 wrote:

  Hi,



 Is it possible to configure spark to run without admin permission on
 windows?



 My current setup run master  slave successfully with admin permission.

 However, if I downgrade permission level from admin to user, SparkPi fails
 with the following exception on the slave node:

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to s

 tage failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
 Lost task

 0.3 in stage 0.0 (TID 9,
 workernode0.jnashsparkcurr2.d10.internal.cloudapp.net)

 : java.lang.ClassNotFoundException:
 org.apache.spark.examples.SparkPi$$anonfun$1



 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

 at java.lang.Class.forName0(Native Method)

 at java.lang.Class.forName(Class.java:270)



 Upon investigation, it appears that sparkPi jar under
 spark_home\worker\appname\*.jar does not have execute permission set,
 causing spark not able to find class.



 Advice would be very much appreciated.



 Thanks,

 Judy





Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Tom Walwyn
Thanks Imran, I'll try your suggestions.

I eventually got this to run by 'checkpointing' the joined RDD (according
to Akhil's suggestion) before performing the reduceBy, and then
checkpointing it again afterward. i.e.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .persist(MEMORY_AND_DISK_SER)
 val rdd3 = rdd2.reduceByKey((x,y)=x+y).persist(MEMORY_AND_DISK_SER)
 rdd3.count()

It takes a while, but at least it runs. So, I'll be sure to try your
suggestions for further speed-up.

Thanks again for your help.


On 18 February 2015 at 18:47, Imran Rashid iras...@cloudera.com wrote:

 Hi Tom,

 there are a couple of things you can do here to make this more efficient.
  first, I think you can replace your self-join with a groupByKey. on your
 example data set, this would give you

 (1, Iterable(2,3))
 (4, Iterable(3))

 this reduces the amount of data that needs to be shuffled, and that way
 you can produce all of your pairs just from the Iterable(2,3).

 second, if you expect the same pairs to appear many times in your dataset,
 you might first want to replace them with a count.  eg., if you start with

 (1,2)
 (1,2)
 (1,2)
 ...
 (1,2)
 (1,3)
 (1,3)
 (4,3)
 ...

 you might want to first convert that to get a count of each pair

 val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _}

 to give you something like:

 ((1,2), 145)
 ((1,3), 2)
 ((4,3), 982)
 ...

 and then with a little more massaging you can group by key and also keep
 the counts of each item:

 val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
 pairCounts.map{case((key, value), counts) =
   key - (value,counts)
 }.groupByKey

 which would give you something like

 (1, Iterable((2,145), (3, 2))
 (4, Iterable((3, 982))


 hope this helps
 Imran

 On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote:

 Thanks for the reply, I'll try your suggestions.

 Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
 of (Int, Int). I'm doing the self-join so I can count two things. First, I
 can count the number of times a value appears in the data set. Second I can
 count number of times values occur with the same key. For example, if I
 have the following:

 (1,2)
 (1,3)
 (4,3)

 Then joining with itself I get:

 (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
 (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
 (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
 (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
 (4,(3,3)) - map - ((3,3),1) _|

 Note that I want to keep the duplicates (2,2) and reflections.

 Rgds

 On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 

Re: Is spark streaming +MlLib for online learning?

2015-02-18 Thread Reza Zadeh
This feature request is already being tracked:
https://issues.apache.org/jira/browse/SPARK-4981
Aiming for 1.4
Best,
Reza

On Wed, Feb 18, 2015 at 2:40 AM, mucaho muc...@yahoo.com wrote:

 Hi

 What is the general consensus/roadmap for implementing additional online /
 streamed trainable models?

 Apache Spark 1.2.1 currently supports streaming linear regression 
 clustering, although other streaming linear methods are planned according
 to
 the issue tracker.
 However, I can not find any details on the issue tracker about online
 training of a collaborative filter. Judging from  another mailing list
 discussion
 
 http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ce07aa61e-eeb9-4ded-be3e-3f04003e4...@storefront.be%3E
 
 incremental training should be possible for ALS. Any plans for the future?

 Regards
 mucaho



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-spark-streaming-MlLib-for-online-learning-tp19701p21698.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to pass parameters to a spark-jobserver Scala class?

2015-02-18 Thread Sasi
Thank you very much Vasu. Let me add some more points to my question. We are
developing a Java program for connection spark-jobserver to Vaadin (Java
framework). Following is the sample code I wrote for connecting both (the
code works fine):
/
URL url = null;
HttpURLConnection connection = null;
String strQueryUri =
http://localhost:8090/jobs?appName=sparkingclassPath=sparking.jobserver.GetOrCreateUserscontext=user-context;;
url = new URL(strQueryUri);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod(POST);
connection.setRequestProperty(Accept, application/json);
InputStream isQueryJSON = connection.getInputStream();
LinkedHashMapString, Object queryMap = (LinkedHashMapString, Object)
getJSONKeyValue(isQueryJSON, null, result);
String strJobId = (String) queryMap.get(jobId);/

Can you suggest how to modify above code for passing parameters (as we do in
*curl -d ...*) during job run?

Hope I make sense.

Sasi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to connect a mobile app (Android/iOS) with a Spark backend?

2015-02-18 Thread Ralph Bergmann | the4thFloor.eu
Hi,


I have dependency problems to use spark-core inside of a HttpServlet
(see other mail from me).

Maybe I'm wrong?!

What I want to do: I develop a mobile app (Android and iOS) and want to
connect them with Spark on backend side.

To do this I want to use Tomcat. The app uses https to ask Tomcat for
the needed data and Tomcat asks Spark.

Is this the right way? Or is there a better way to connect my mobile
apps with the Spark backend?

I hope that I'm not the first one who want to do this.



Ralph



signature.asc
Description: OpenPGP digital signature


Periodic Broadcast in Apache Spark Streaming

2015-02-18 Thread aanilpala
I am implementing a stream learner for text classification. There are some
single-valued parameters in my implementation that needs to be updated as
new stream items arrive. For example, I want to change learning rate as the
new predictions are made. However, I doubt that there is a way to broadcast
variables after the initial broadcast. So what happens if I need to
broadcast a variable every time I update it. If there is a way to do it or a
workaround for what I want to accomplish in Spark Streaming, I'd be happy to
hear about it.

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Periodic-Broadcast-in-Apache-Spark-Streaming-tp21703.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Sean Owen
At some level, enough RDDs creates hundreds of thousands of tiny
partitions of data each of which creates a task for each stage. The
raw overhead of all the message passing can slow things down a lot. I
would not design something to use an RDD per key. You would generally
use key by some value you want to divide and filter on, and then use a
*ByKey to do your work.

You don't work with individual RDDs this way, but usually that's good
news. You usually have a lot more flexibility operating just in pure
Java / Scala to do whatever you need inside your function.

On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá
juan.rodriguez.hort...@gmail.com wrote:
 Hi Paweł,

 Thanks a lot for your answer. I finally got the program to work by using
 aggregateByKey, but I was wondering why creating thousands of RDDs doesn't
 work. I think that could be interesting for using methods that work on RDDs
 like for example JavaDoubleRDD.stats() (
 http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29).
 If the groups are small then I can chain groupBy(), collect(), parallelize()
 and stats(), but that is quite inefficient because it implies moving data to
 and from the driver, and also doesn't scale to big groups; on the other hand
 if I use aggregateByKey or a similar function then I cannot use stats() so I
 have to reimplement it. In general I was looking for a way to reuse other
 functions that I have that work on RDDs, for using them on groups of data in
 a RDD, because I don't see a how to directly apply them to each of the
 groups in a pair RDD.

 Again, thanks a lot for your answer,

 Greetings,

 Juan Rodriguez




 2015-02-18 14:56 GMT+01:00 Paweł Szulc paul.sz...@gmail.com:

 Maybe you can omit using grouping all together with groupByKey? What is
 your next step after grouping elements by key? Are you trying to reduce
 values? If so then I would recommend using some reducing functions like for
 example reduceByKey or aggregateByKey. Those will first reduce value for
 each key locally on each node before doing actual IO over the network. There
 will also be no grouping phase so you will not run into memory issues.

 Please let me know if that helped

 Pawel Szulc
 @rabbitonweb
 http://www.rabbitonweb.com


 On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá
 juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 I'm writing a Spark program where I want to divide a RDD into different
 groups, but the groups are too big to use groupByKey. To cope with that,
 since I know in advance the list of keys for each group, I build a map from
 the keys to the RDDs that result from filtering the input RDD to get the
 records for the corresponding key. This works when I have a small number of
 keys, but for big number of keys (tens of thousands) the execution gets
 stuck, without issuing any new Spark stage. I suspect the reason is that the
 Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm
 rewriting the program to use a single RDD of pairs, with cached partions,
 but I wanted to be sure I understand the problem here.

 Thanks a lot in advance,

 Greetings,

 Juan Rodriguez




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Dmitry Goldenberg
Are you proposing I downgrade Solrj's httpclient dependency to be on par with 
that of Spark/Hadoop? Or upgrade Spark/Hadoop's httpclient to the latest?

Solrj has to stay with its selected version. I could try and rebuild Spark with 
the latest httpclient but I've no idea what effects that may cause on Spark.

Sent from my iPhone

 On Feb 18, 2015, at 1:37 AM, Arush Kharbanda ar...@sigmoidanalytics.com 
 wrote:
 
 Hi
 
 Did you try to make maven pick the latest version
 
 http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management
 
 That way solrj won't cause any issue, you can try this and check if the part 
 of your code where you access HDFS works fine?
 
 
 
 On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com 
 wrote:
 I'm getting the below error when running spark-submit on my class. This class
 has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
 4.10.3 from within the class.
 
 This is in conflict with the older version, HttpClient 3.1 that's a
 dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).
 
 I've tried setting spark.files.userClassPathFirst to true in SparkConf in my
 program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf as
 
 spark.files.userClassPathFirst true
 
 No go, I'm still getting the error, as below. Is there anything else I can
 try? Are there any plans in Spark to support multiple class loaders?
 
 Exception in thread main java.lang.NoSuchMethodError:
 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at
 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at
 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at
 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
 ...
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -- 
 
 
 Arush Kharbanda || Technical Teamlead
 
 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: How to integrate hive on spark

2015-02-18 Thread Arush Kharbanda
Hi

Did you try these steps.


https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started

Thanks
Arush

On Wed, Feb 18, 2015 at 7:20 PM, sandeepvura sandeepv...@gmail.com wrote:

 Hi ,

 I am new to sparks.I had installed spark on 3 node cluster.I would like to
 integrate hive on spark .

 can anyone please help me on this,

 Regards,
 Sandeep.v



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-integrate-hive-on-spark-tp21702.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Creating RDDs from within foreachPartition() [Spark-Streaming]

2015-02-18 Thread Sean Owen
You can't use RDDs inside RDDs. RDDs are managed from the driver, and
functions like foreachRDD execute things on the remote executors.

You can write code to simply directly save whatever you want to ES.
There is not necessarily a need to use RDDs for that.

On Wed, Feb 18, 2015 at 11:36 AM, t1ny wbr...@gmail.com wrote:
 Hi all,

 I am trying to create RDDs from within /rdd.foreachPartition()/ so I can
 save these RDDs to ElasticSearch on the fly :

 stream.foreachRDD(rdd = {
 rdd.foreachPartition {
   iterator = {
 val sc = rdd.context
 iterator.foreach {
   case (cid, sid, ts) = {

 [...]

 sc.makeRDD(...).saveToEs(...) - *throws a
 NullPointerException (sc is null)*
   }
 }
   }
 }
 }

 Unfortunately this doesn't work as I can't seem to be able to access the
 SparkContext from anywhere within /foreachPartition()/. The code above
 throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where
 ssc is the StreamingContext object created in the main function, outside of
 /foreachPartition/) then I get a NotSerializableException.

 What is the correct way to do this ?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to connect a mobile app (Android/iOS) with a Spark backend?

2015-02-18 Thread Arush Kharbanda
I am running Spark Jobs behind tomcat. We didn't face any issues.But for us
the user base is very small.

The possible blockers could be
1. If there are many users of the system. Then jobs might have to w8, you
might want to think about the kind of scheduling you want to do.
2.Again if the no of users is a bit high, Tomcat doesn't scale really
well.(Not sure how much of a blocker it is).

Thanks
Arush

On Wed, Feb 18, 2015 at 6:41 PM, Ralph Bergmann | the4thFloor.eu 
ra...@the4thfloor.eu wrote:

 Hi,


 I have dependency problems to use spark-core inside of a HttpServlet
 (see other mail from me).

 Maybe I'm wrong?!

 What I want to do: I develop a mobile app (Android and iOS) and want to
 connect them with Spark on backend side.

 To do this I want to use Tomcat. The app uses https to ask Tomcat for
 the needed data and Tomcat asks Spark.

 Is this the right way? Or is there a better way to connect my mobile
 apps with the Spark backend?

 I hope that I'm not the first one who want to do this.



 Ralph




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Juan Rodríguez Hortalá
Hi Paweł,

Thanks a lot for your answer. I finally got the program to work by using
aggregateByKey, but I was wondering why creating thousands of RDDs doesn't
work. I think that could be interesting for using methods that work on RDDs
like for example JavaDoubleRDD.stats() (
http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29).
If the groups are small then I can chain groupBy(), collect(),
parallelize() and stats(), but that is quite inefficient because it implies
moving data to and from the driver, and also doesn't scale to big groups;
on the other hand if I use aggregateByKey or a similar function then I
cannot use stats() so I have to reimplement it. In general I was looking
for a way to reuse other functions that I have that work on RDDs, for using
them on groups of data in a RDD, because I don't see a how to directly
apply them to each of the groups in a pair RDD.

Again, thanks a lot for your answer,

Greetings,

Juan Rodriguez




2015-02-18 14:56 GMT+01:00 Paweł Szulc paul.sz...@gmail.com:

 Maybe you can omit using grouping all together with groupByKey? What is
 your next step after grouping elements by key? Are you trying to reduce
 values? If so then I would recommend using some reducing functions like for
 example reduceByKey or aggregateByKey. Those will first reduce value for
 each key locally on each node before doing actual IO over the network.
 There will also be no grouping phase so you will not run into memory issues.

 Please let me know if that helped

 Pawel Szulc
 @rabbitonweb
 http://www.rabbitonweb.com


 On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá 
 juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 I'm writing a Spark program where I want to divide a RDD into different
 groups, but the groups are too big to use groupByKey. To cope with that,
 since I know in advance the list of keys for each group, I build a map from
 the keys to the RDDs that result from filtering the input RDD to get the
 records for the corresponding key. This works when I have a small number of
 keys, but for big number of keys (tens of thousands) the execution gets
 stuck, without issuing any new Spark stage. I suspect the reason is that
 the Spark scheduler is not able to handle so many RDDs. Does it make sense?
 I'm rewriting the program to use a single RDD of pairs, with cached
 partions, but I wanted to be sure I understand the problem here.

 Thanks a lot in advance,

 Greetings,

 Juan Rodriguez





Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Emre Sevinc
Hello Dmitry,

I had almost the same problem and solved it by using version 4.0.0 of SolrJ:

dependency
  groupIdorg.apache.solr/groupId
  artifactIdsolr-solrj/artifactId
  version4.0.0/version
 /dependency

In my case, I was lucky that version 4.0.0 of SolrJ had all the
functionality I needed.

--
Emre Sevinç
http://www.bigindustries.be/



On Wed, Feb 18, 2015 at 4:39 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 I think I'm going to have to rebuild Spark with commons.httpclient.version
 set to 4.3.1 which looks to be the version chosen by Solrj, rather than the
 4.2.6 that Spark's pom mentions. Might work.

 On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 Hi

 Did you try to make maven pick the latest version


 http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management

 That way solrj won't cause any issue, you can try this and check if the
 part of your code where you access HDFS works fine?



 On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I'm getting the below error when running spark-submit on my class. This
 class
 has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
 4.10.3 from within the class.

 This is in conflict with the older version, HttpClient 3.1 that's a
 dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).

 I've tried setting spark.files.userClassPathFirst to true in SparkConf
 in my
 program, also setting it to true in
 $SPARK-HOME/conf/spark-defaults.conf as

 spark.files.userClassPathFirst true

 No go, I'm still getting the error, as below. Is there anything else I
 can
 try? Are there any plans in Spark to support multiple class loaders?

 Exception in thread main java.lang.NoSuchMethodError:

 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at

 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at

 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at

 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
 ...





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





-- 
Emre Sevinc


Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Dmitry Goldenberg
Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would
that not collide with Spark/Hadoop's default dependency on HttpClient set
to 4.2.6? If that's the case that might just solve the problem.

Would Solrj 4.0.0 work with the latest Solr, 4.10.3?

On Wed, Feb 18, 2015 at 10:50 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Dmitry,

 I had almost the same problem and solved it by using version 4.0.0 of
 SolrJ:

 dependency
   groupIdorg.apache.solr/groupId
   artifactIdsolr-solrj/artifactId
   version4.0.0/version
  /dependency

 In my case, I was lucky that version 4.0.0 of SolrJ had all the
 functionality I needed.

 --
 Emre Sevinç
 http://www.bigindustries.be/



 On Wed, Feb 18, 2015 at 4:39 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 I think I'm going to have to rebuild Spark with
 commons.httpclient.version set to 4.3.1 which looks to be the version
 chosen by Solrj, rather than the 4.2.6 that Spark's pom mentions. Might
 work.

 On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 Hi

 Did you try to make maven pick the latest version


 http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management

 That way solrj won't cause any issue, you can try this and check if the
 part of your code where you access HDFS works fine?



 On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I'm getting the below error when running spark-submit on my class. This
 class
 has a transitive dependency on HttpClient v.4.3.1 since I'm calling
 SolrJ
 4.10.3 from within the class.

 This is in conflict with the older version, HttpClient 3.1 that's a
 dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).

 I've tried setting spark.files.userClassPathFirst to true in SparkConf
 in my
 program, also setting it to true in
 $SPARK-HOME/conf/spark-defaults.conf as

 spark.files.userClassPathFirst true

 No go, I'm still getting the error, as below. Is there anything else I
 can
 try? Are there any plans in Spark to support multiple class loaders?

 Exception in thread main java.lang.NoSuchMethodError:

 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at

 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at

 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at

 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
 ...





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





 --
 Emre Sevinc



Re: Why cached RDD is recomputed again?

2015-02-18 Thread shahab
Thanks Sean, but I don't think that fitting into memory  is the case,
because:
1- I can see in the UI that 100% of RDD is cached, (moreover the RDD is
quite small, 100 MB, while worker has 1.5 GB)
2- I also tried  MEMORY_AND_DISK, but absolutely no difference !

Probably I have messed up somewhere else!
Do you have any other idea where I should look for the cause?

best,
/Shahab

On Wed, Feb 18, 2015 at 4:22 PM, Sean Owen so...@cloudera.com wrote:

 The mostly likely explanation is that you wanted to put all the
 partitions in memory and they don't all fit. Unless you asked to
 persist to memory or disk, some partitions will simply not be cached.

 Consider using MEMORY_OR_DISK persistence.

 This can also happen if blocks were lost due to node failure.

 On Wed, Feb 18, 2015 at 3:19 PM, shahab shahab.mok...@gmail.com wrote:
  Hi,
 
  I have a cached RDD (I can see in UI that it is cached), but when I use
 this
  RDD , I can see that the RDD is partially recomputed (computed) again.
 It is
  partially because I can see in UI that some task are skipped (have a
 look
  at the attached figure).
 
  Now the question is 1: what causes a cached RDD to be recomputed again?
 and
  why somes tasks are skipped and some not??
 
  best,
  /Shahab
 
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org



[no subject]

2015-02-18 Thread Luca Puggini



Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Emre Sevinc
On Wed, Feb 18, 2015 at 4:54 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would
 that not collide with Spark/Hadoop's default dependency on HttpClient set
 to 4.2.6? If that's the case that might just solve the problem.

 Would Solrj 4.0.0 work with the latest Solr, 4.10.3?


In my case, it worked; I mean I was trying to send some documents to the
latest version of Solr server (v4.10.3), and using v4.0.0 of SolrJ worked
without any problems so far. I couldn't find any other way to deal with
this old httpclient dependency problem in Spark.

--
Emre Sevinç
http://www.bigindustries.be/


Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Dmitry Goldenberg
Thanks, Emre! Will definitely try this.

On Wed, Feb 18, 2015 at 11:00 AM, Emre Sevinc emre.sev...@gmail.com wrote:


 On Wed, Feb 18, 2015 at 4:54 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would
 that not collide with Spark/Hadoop's default dependency on HttpClient set
 to 4.2.6? If that's the case that might just solve the problem.

 Would Solrj 4.0.0 work with the latest Solr, 4.10.3?


 In my case, it worked; I mean I was trying to send some documents to the
 latest version of Solr server (v4.10.3), and using v4.0.0 of SolrJ worked
 without any problems so far. I couldn't find any other way to deal with
 this old httpclient dependency problem in Spark.

 --
 Emre Sevinç
 http://www.bigindustries.be/





Re: How to connect a mobile app (Android/iOS) with a Spark backend?

2015-02-18 Thread Sean Owen
This does not sound like a Spark problem -- doesn't even necessarily
sound like a distributed problem. Are you of a scale where building
simple logic in a web tier that queries a NoSQL / SQL database doesn't
work?

If you are at such a scale, then it sounds like you're describing a
very high volume of small, very low latency queries. Spark is not
designed for that. However it could do some crunching in the
background and feed a serving layer technology like a NoSQL store with
recent results, for example.

On Wed, Feb 18, 2015 at 3:23 PM, Ralph Bergmann | the4thFloor.eu
ra...@the4thfloor.eu wrote:
 Hi,


 Am 18.02.15 um 15:58 schrieb Sean Owen:
 That said, it depends a lot on what you are trying to do. What are you
 trying to do? You just say you're connecting to spark.

 There are 2 tasks I want to solve with Spark.

 1) The user opens the mobile app. The app sends a pink to the backend.
 When this happens the backend has to collect some data from other server
 via http and has to do some stuff with this data.

 2) The mobile app can download this data from 1. In this case the
 backend has to find/create the right data (depending on user location,
 rating, etc.)


 Ralph


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Learning GraphX Questions

2015-02-18 Thread Matthew Bucci
Thanks for all the responses so far! I have started to understand the
system more, but I just had another question while I was going along. Is
there a way to check the individual partitions of an RDD? For example, if I
had a graph with vertices a,b,c,d and it was split into 2 partitions could
I check which vertices belonged in partition 1 and parition 2?

Thank You,
Matthew Bucci

On Fri, Feb 13, 2015 at 10:58 PM, Ankur Dave ankurd...@gmail.com wrote:

 At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote:
  1) How do you actually run programs in GraphX? At the moment I've been
 doing
  everything live through the shell, but I'd obviously like to be able to
 work
  on it by writing and running scripts.

 You can create your own projects that build against Spark and GraphX
 through a Maven dependency [1], then run those applications using the
 bin/spark-submit script included with Spark [2].

 These guides assume you already know how to do this using your preferred
 build tool (SBT or Maven). In short, here's how to do it with SBT:

 1. Install SBT locally (`brew install sbt` on OS X).

 2. Inside your project directory, create a build.sbt file listing Spark
 and GraphX as a dependency, as in [3].

 3. Run `sbt package` in a shell.

 4. Pass the JAR in your_project_dir/target/scala-2.10/ to bin/spark-submit.

 [1]
 http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark
 [2] http://spark.apache.org/docs/latest/submitting-applications.html
 [3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4

  2) Is there a way to check the status of the partitions of a graph? For
  example, I want to determine for starters if the number of partitions
  requested are always made, like if I ask for 8 partitions but only have 4
  cores what happens?

 You can look at `graph.vertices` and `graph.edges`, which are both RDDs,
 so you can do for example: graph.vertices.partitions

  3) Would I be able to partition by vertex instead of edges, even if I
 had to
  write it myself? I know partitioning by edges is favored in a majority of
  the cases, but for the sake of research I'd like to be able to do both.

 If you pass PartitionStrategy.EdgePartition1D, this will partition edges
 by their source vertices, so all edges with the same source will be
 co-partitioned, and the communication pattern will be similar to
 vertex-partitioned (edge-cut) systems like Giraph.

  4) Is there a better way to time processes outside of using built-in unix
  timing through the logs or something?

 I think the options are Unix timing, log file timestamp parsing, looking
 at the web UI, or writing timing code within your program
 (System.currentTimeMillis and System.nanoTime).

 Ankur



Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Dmitry Goldenberg
I think I'm going to have to rebuild Spark with commons.httpclient.version
set to 4.3.1 which looks to be the version chosen by Solrj, rather than the
4.2.6 that Spark's pom mentions. Might work.

On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda ar...@sigmoidanalytics.com
 wrote:

 Hi

 Did you try to make maven pick the latest version


 http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management

 That way solrj won't cause any issue, you can try this and check if the
 part of your code where you access HDFS works fine?



 On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I'm getting the below error when running spark-submit on my class. This
 class
 has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
 4.10.3 from within the class.

 This is in conflict with the older version, HttpClient 3.1 that's a
 dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).

 I've tried setting spark.files.userClassPathFirst to true in SparkConf in
 my
 program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf
 as

 spark.files.userClassPathFirst true

 No go, I'm still getting the error, as below. Is there anything else I can
 try? Are there any plans in Spark to support multiple class loaders?

 Exception in thread main java.lang.NoSuchMethodError:

 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at

 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at

 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at

 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
 ...





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Re:Is Databricks log analysis reference app only based on Java API

2015-02-18 Thread Todd
sorry for the noise. I have found it..
At 2015-02-18 23:34:40, Todd bit1...@163.com wrote:

Looks the log anylysis reference app provided by Databricks at 
https://github.com/databricks/reference-apps only has java API?
I'd like to see the Scala version one.



Is Databricks log analysis reference app only based on Java API

2015-02-18 Thread Todd
Looks the log anylysis reference app provided by Databricks at 
https://github.com/databricks/reference-apps only has java API?
I'd like to see the Scala version one.



How to integrate hive on spark

2015-02-18 Thread sandeepvura
Hi ,

I am new to sparks.I had installed spark on 3 node cluster.I would like to
integrate hive on spark .

can anyone please help me on this,

Regards,
Sandeep.v



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-integrate-hive-on-spark-tp21702.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there a limit to the number of RDDs in a Spark context?

2015-02-18 Thread Paweł Szulc
Maybe you can omit using grouping all together with groupByKey? What is
your next step after grouping elements by key? Are you trying to reduce
values? If so then I would recommend using some reducing functions like for
example reduceByKey or aggregateByKey. Those will first reduce value for
each key locally on each node before doing actual IO over the network.
There will also be no grouping phase so you will not run into memory issues.

Please let me know if that helped

Pawel Szulc
@rabbitonweb
http://www.rabbitonweb.com


On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 I'm writing a Spark program where I want to divide a RDD into different
 groups, but the groups are too big to use groupByKey. To cope with that,
 since I know in advance the list of keys for each group, I build a map from
 the keys to the RDDs that result from filtering the input RDD to get the
 records for the corresponding key. This works when I have a small number of
 keys, but for big number of keys (tens of thousands) the execution gets
 stuck, without issuing any new Spark stage. I suspect the reason is that
 the Spark scheduler is not able to handle so many RDDs. Does it make sense?
 I'm rewriting the program to use a single RDD of pairs, with cached
 partions, but I wanted to be sure I understand the problem here.

 Thanks a lot in advance,

 Greetings,

 Juan Rodriguez



Re: How to connect a mobile app (Android/iOS) with a Spark backend?

2015-02-18 Thread Sean Owen
Although you can do lots of things, I don't think Spark is something
you should think of as a synchronous, real-time query API. So, somehow
trying to use it directly from a REST API is probably not the best
architecture.

That said, it depends a lot on what you are trying to do. What are you
trying to do? You just say you're connecting to spark.

On Wed, Feb 18, 2015 at 1:11 PM, Ralph Bergmann | the4thFloor.eu
ra...@the4thfloor.eu wrote:
 Hi,


 I have dependency problems to use spark-core inside of a HttpServlet
 (see other mail from me).

 Maybe I'm wrong?!

 What I want to do: I develop a mobile app (Android and iOS) and want to
 connect them with Spark on backend side.

 To do this I want to use Tomcat. The app uses https to ask Tomcat for
 the needed data and Tomcat asks Spark.

 Is this the right way? Or is there a better way to connect my mobile
 apps with the Spark backend?

 I hope that I'm not the first one who want to do this.



 Ralph


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-18 Thread Imran Rashid
so if you only change this line:

https://gist.github.com/emres/0fb6de128baea099e741#file-mymoduledriver-java-L137

to

json.print()

it processes 16 files instead?  I am totally perplexed.  My only
suggestions to help debug are
(1) see what happens when you get rid of MyModuleWorker completely --
change MyModuleDriver#process to just
inStream.print()
and see what happens

(2) stick a bunch of printlns into MyModuleWorker#call

(3) turn on DEBUG logging
for org.apache.spark.streaming.dstream.FileInputDStream

my gut instinct is that something else is flaky about the file input stream
(eg., it makes some assumption about the file system which maybe aren't
valid in your case, it has a bunch of caveats), and that it has just
happened to work sometimes with your foreachRdd and failed sometimes with
print.

Sorry I am not a lot of help in this case, hope this leads you down the
right track or somebody else can help out.

Imran


On Wed, Feb 18, 2015 at 2:28 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Imran,

 (a) I know that all 20 files are processed when I use foreachRDD, because
 I can see the processed files in the output directory. (My application
 logic writes them to an output directory after they are processed, *but*
 that writing operation does not happen in foreachRDD, below you can see the
 URL that includes my code and clarifies this).

 (b) I know only 16 files are processed because in the output directory I
 see only 16 files processed. I wait for minutes and minutes and no more
 files appear in the output directory. When I see only 16 files are
 processed and Spark Streaming went to the mode of idly watching the input
 directory, and then if I copy a few more files, they are also processed.

 (c) Sure, you can see part of my code in the following gist:
 https://gist.github.com/emres/0fb6de128baea099e741
  It might seem a little convoluted at first, because my application is
 divided into two classes, a Driver class (setting up things and
 initializing them), and a Worker class (that implements the core
 functionality). I've also put the relevant methods from the my utility
 classes for completeness.

 I am as perplexed as you are as to why forcing the output via foreachRDD
 ended up in different behaviour compared to simply using print() method.

 Kind regards,
 Emre



 On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Hi Emre,

 there shouldn't be any difference in which files get processed w/ print()
 vs. foreachRDD().  In fact, if you look at the definition of print(), it is
 just calling foreachRDD() underneath.  So there is something else going on
 here.

 We need a little more information to figure out exactly what is going on.
  (I think Sean was getting at the same thing ...)

 (a) how do you know that when you use foreachRDD, all 20 files get
 processed?

 (b) How do you know that only 16 files get processed when you print()? Do
 you know the other files are being skipped, or maybe they are just stuck
 somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
 processed ... what happens after you add a few more files to the
 directory?  Are they processed immediately, or are they never processed
 either?

 (c) Can you share any more code of what you are doing to the dstreams
 *before* the print() / foreachRDD()?  That might give us more details about
 what the difference is.

 I can't see how .count.println() would be different than just println(),
 but maybe I am missing something also.

 Imran

 On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files
 and does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also
 have fixed my problem, but I'm satisfied with foreachRDD approach for now.
 (Though it is still a mystery to me why using .print() had a difference,
 maybe my mental model of Spark is wrong, I thought no matter what output
 operation I used, the number of files processed by Spark would be
 independent of that because the processing is done in a different method,
 .print() is only used to force Spark execute that processing, am I wrong?).

 --
 Emre


 On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't
 let you detect whether it 

Re: Why cached RDD is recomputed again?

2015-02-18 Thread Sean Owen
The mostly likely explanation is that you wanted to put all the
partitions in memory and they don't all fit. Unless you asked to
persist to memory or disk, some partitions will simply not be cached.

Consider using MEMORY_OR_DISK persistence.

This can also happen if blocks were lost due to node failure.

On Wed, Feb 18, 2015 at 3:19 PM, shahab shahab.mok...@gmail.com wrote:
 Hi,

 I have a cached RDD (I can see in UI that it is cached), but when I use this
 RDD , I can see that the RDD is partially recomputed (computed) again. It is
 partially because I can see in UI that some task are skipped (have a look
 at the attached figure).

 Now the question is 1: what causes a cached RDD to be recomputed again? and
 why somes tasks are skipped and some not??

 best,
 /Shahab



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to connect a mobile app (Android/iOS) with a Spark backend?

2015-02-18 Thread Ralph Bergmann | the4thFloor.eu
Hi,


Am 18.02.15 um 15:58 schrieb Sean Owen:
 That said, it depends a lot on what you are trying to do. What are you
 trying to do? You just say you're connecting to spark.

There are 2 tasks I want to solve with Spark.

1) The user opens the mobile app. The app sends a pink to the backend.
When this happens the backend has to collect some data from other server
via http and has to do some stuff with this data.

2) The mobile app can download this data from 1. In this case the
backend has to find/create the right data (depending on user location,
rating, etc.)


Ralph



signature.asc
Description: OpenPGP digital signature


Re: WARN from Similarity Calculation

2015-02-18 Thread Debasish Das
I am still debugging it but I believe if m% of users have unusually large
columns and the RDD partitioner on RowMatrix is hashPartitioner then due to
the basic algorithm without sampling, some partitions can cause unusually
large number of keys...

If my debug shows that I will add a custom partitioner for RowMatrix (will
be useful for sparse vectors, for dense vector it does not matter)...

Of course from feature engineering, we will see if we can cut off the users
with large number of columns...

On Tue, Feb 17, 2015 at 1:58 PM, Xiangrui Meng men...@gmail.com wrote:

 It may be caused by GC pause. Did you check the GC time in the Spark
 UI? -Xiangrui

 On Sun, Feb 15, 2015 at 8:10 PM, Debasish Das debasish.da...@gmail.com
 wrote:
  Hi,
 
  I am sometimes getting WARN from running Similarity calculation:
 
  15/02/15 23:07:55 WARN BlockManagerMasterActor: Removing BlockManager
  BlockManagerId(7, abc.com, 48419, 0) with no recent heart beats: 66435ms
  exceeds 45000ms
 
  Do I need to increase the default 45 s to larger values for cases where
 we
  are doing blocked operation or long compute in the mapPartitions ?
 
  Thanks.
  Deb



Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Imran Rashid
Hi Tom,

there are a couple of things you can do here to make this more efficient.
 first, I think you can replace your self-join with a groupByKey. on your
example data set, this would give you

(1, Iterable(2,3))
(4, Iterable(3))

this reduces the amount of data that needs to be shuffled, and that way you
can produce all of your pairs just from the Iterable(2,3).

second, if you expect the same pairs to appear many times in your dataset,
you might first want to replace them with a count.  eg., if you start with

(1,2)
(1,2)
(1,2)
...
(1,2)
(1,3)
(1,3)
(4,3)
...

you might want to first convert that to get a count of each pair

val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _}

to give you something like:

((1,2), 145)
((1,3), 2)
((4,3), 982)
...

and then with a little more massaging you can group by key and also keep
the counts of each item:

val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
pairCounts.map{case((key, value), counts) =
  key - (value,counts)
}.groupByKey

which would give you something like

(1, Iterable((2,145), (3, 2))
(4, Iterable((3, 982))


hope this helps
Imran

On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote:

 Thanks for the reply, I'll try your suggestions.

 Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
 of (Int, Int). I'm doing the self-join so I can count two things. First, I
 can count the number of times a value appears in the data set. Second I can
 count number of times values occur with the same key. For example, if I
 have the following:

 (1,2)
 (1,3)
 (4,3)

 Then joining with itself I get:

 (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
 (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
 (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
 (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
 (4,(3,3)) - map - ((3,3),1) _|

 Note that I want to keep the duplicates (2,2) and reflections.

 Rgds

 On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas






Re: Spark can't pickle class: error cannot lookup attribute

2015-02-18 Thread Davies Liu
Currently, PySpark can not support pickle a class object in current
script ( '__main__'), the workaround could be put the implementation
of the class into a separate module, then use bin/spark-submit
--py-files xxx.py in deploy it.

in xxx.py:

class test(object):
  def __init__(self, a, b):
self.total = a + b

in job.py:

from xxx import test
a = sc.parallelize([(True,False),(False,False)])
a.map(lambda (x,y): test(x,y))

run it by:

bin/spark-submit --py-files xxx.py job.py


On Wed, Feb 18, 2015 at 1:48 PM, Guillaume Guy
guillaume.c@gmail.com wrote:
 Hi,

 This is a duplicate of the stack-overflow question here. I hope to generate
 more interest  on this mailing list.


 The problem:

 I am running into some attribute lookup problems when trying to initiate a
 class within my RDD.

 My workflow is quite standard:

 1- Start with an RDD

 2- Take each element of the RDD, initiate an object for each

 3- Reduce (I will write a method that will define the reduce operation later
 on)

 Here is #2:

 class test(object):
 def __init__(self, a,b):
 self.total = a + b

 a = sc.parallelize([(True,False),(False,False)])
 a.map(lambda (x,y): test(x,y))

 Here is the error I get:

 PicklingError: Can't pickle  class 'main.test' : attribute lookup
 main.test failed

 I'd like to know if there is any way around it. Please, answer with a
 working example to achieve the intended results (i.e. creating a RDD of
 objects of class tests).

 Thanks in advance!

 Related question:

 https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn


 GG


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JdbcRDD, ClassCastException with scala.Function0

2015-02-18 Thread Dmitry Goldenberg
That's exactly what I was doing. However, I ran into runtime issues with
doing that. For instance, I had a

  public class DbConnection extends AbstractFunction0Connection
implements Serializable

I got a runtime error from Spark complaining that DbConnection wasn't an
instance of scala.Function0.

I also had a

  public class MapRow extends
scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements
Serializable

with which I seemed to have more luck.

On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org wrote:

 Cant you implement the

 org.apache.spark.api.java.function.Function

 interface and pass an instance of that to JdbcRDD.create ?

 On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Cody, you were right, I had a copy and paste snag where I ended up with a
 vanilla SparkContext rather than a Java one.  I also had to *not* use my
 function subclasses, rather just use anonymous inner classes for the
 Function stuff and that got things working. I'm fully following
 the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.

 Is there a clean way to refactor out the custom Function classes such as
 the one for getting a db connection or mapping ResultSet data to your own
 POJO's rather than doing it all inline?


 On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Is sc there a SparkContext or a JavaSparkContext?  The compilation error
 seems to indicate the former, but JdbcRDD.create expects the latter

 On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 I have tried that as well, I get a compile error --

 [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for
 create(SparkContext,anonymous
 ConnectionFactory,String,int,int,int,anonymous
 FunctionResultSet,Integer)

 The code is a copy and paste:

 JavaRDDInteger jdbcRDD = JdbcRDD.create(
   sc,
   new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
   return
 DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb);
 }
   },
   SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
   1, 100, 1,
   new FunctionResultSet, Integer() {
 public Integer call(ResultSet r) throws Exception {
   return r.getInt(1);
 }
   }
 );

 The other thing I've tried was to define a static class locally for
 GetConnection and use the JdbcCreate constructor. This got around the
 compile issues but blew up at runtime with NoClassDefFoundError:
 scala/runtime/AbstractFunction0 !

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 sc,
 (AbstractFunction0Connection) new DbConn(), // had to cast or a
 compile error
 SQL_QUERY,
 0L,
 1000L,
 10,
 new MapRow(),
 ROW_CLASS_TAG);
 // DbConn is defined as public static class DbConn extends
 AbstractFunction0Connection implements Serializable

 On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org
 wrote:

 That test I linked


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90

 is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that
 what you tried doing?

 On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg 
 dgoldenberg...@gmail.com wrote:

 Thanks, Cody. Yes, I originally started off by looking at that but I
 get a compile error if I try and use that approach: constructor JdbcRDD 
 in
 class JdbcRDDT cannot be applied to given types.  Not to mention that
 JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
 argument).

 Wonder if it's a JDK version issue, I'm using 1.7.

 So I've got this, which doesn't compile

 JdbcRDDRow jdbcRDD = new JdbcRDDRow(
 new SparkContext(conf),
 new JdbcRDD.ConnectionFactory() {
 public Connection getConnection() throws SQLException {
 Connection conn = null;
 try {
 Class.forName(JDBC_DRIVER);
 conn = DriverManager.getConnection(JDBC_URL, JDBC_USER,
 JDBC_PASSWORD);
 } catch (ClassNotFoundException ex) {
 throw new RuntimeException(Error while loading JDBC driver., ex);
 }
 return conn;
 }
 },
 SELECT * FROM EMPLOYEES,
 0L,
 1000L,
 10,
 new FunctionResultSet, Row() {
 public Row call(ResultSet r) throws Exception {
 return null; // have some actual logic here...
 }
 },
 scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));

 The other approach was mimicing the DbConnection class from this
 post:
 http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
 It got around any of the compilation issues but then I got the runtime
 error where Spark wouldn't recognize the db connection class as a
 scala.Function0.



 On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Take a look at


 https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java



 On Wed, Feb 18, 2015 at 

Re: ML Transformer

2015-02-18 Thread Joseph Bradley
Hi Cesar,

Thanks for trying out Pipelines and bringing up this issue!  It's been an
experimental API, but feedback like this will help us prepare it for
becoming non-Experimental.  I've made a JIRA, and will vote for this being
protected (instead of private[ml]) for Spark 1.3:
https://issues.apache.org/jira/browse/SPARK-5902

Thanks again,
Joseph

On Wed, Feb 18, 2015 at 12:17 PM, Cesar Flores ces...@gmail.com wrote:


 I am working right now with the ML pipeline, which I really like it.
 However in order to make a real use of it, I would like create my own
 transformers that implements org.apache.spark.ml.Transformer. In order to
 do that, a method from the PipelineStage needs to be implemented. But this
 method is private to the ml package:

 private[ml] def transformSchema(schema: StructType, paramMap: ParamMap):
 StructType

 Do any user can create their own transformers? If not, do this
 functionality will be added in the future.

 Thanks
 --
 Cesar Flores



No suitable driver found error, Create table in hive from spark sql

2015-02-18 Thread Dhimant
No suitable driver found error, Create table in hive from spark sql.

I am trying to execute following example.
SPARKGIT:
spark/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala

My setup :- hadoop 1.6,spark 1.2, hive 1.0, mysql server (installed via yum
install mysql55w mysql55w-server)

I can create tables in hive from hive command prompt.
/
hive select * from person_parquet;
OK
Barack  Obama   M
BillClinton M
Hillary Clinton F
Time taken: 1.945 seconds, Fetched: 3 row(s)
/

I am starting spark shell via following command:-

./spark-1.2.0-bin-hadoop2.4/bin/spark-shell --master
spark://sparkmaster.company.com:7077 --jars
/data/mysql-connector-java-5.1.14-bin.jar

/scala Class.forName(com.mysql.jdbc.Driver)
res0: Class[_] = class com.mysql.jdbc.Driver

scala Class.forName(com.mysql.jdbc.Driver).newInstance
res1: Any = com.mysql.jdbc.Driver@2dec8e27

scala val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@32ecf100

scala sqlContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value
STRING))
15/02/18 22:23:01 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF
NOT EXISTS src (key INT, value STRING)
15/02/18 22:23:02 INFO parse.ParseDriver: Parse Completed
15/02/18 22:23:02 INFO metastore.HiveMetaStore: 0: Opening raw store with
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/02/18 22:23:02 INFO metastore.ObjectStore: ObjectStore, initialize called
15/02/18 22:23:02 INFO DataNucleus.Persistence: Property
datanucleus.cache.level2 unknown - will be ignored
15/02/18 22:23:02 INFO DataNucleus.Persistence: Property
hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/02/18 22:23:02 WARN DataNucleus.Connection: BoneCP specified but not
present in CLASSPATH (or one of dependencies)
15/02/18 22:23:02 WARN DataNucleus.Connection: BoneCP specified but not
present in CLASSPATH (or one of dependencies)
15/02/18 22:23:02 ERROR Datastore.Schema: Failed initialising database.
No suitable driver found for jdbc:mysql://sparkmaster.company.com:3306/hive
org.datanucleus.exceptions.NucleusDataStoreException: No suitable driver
found for jdbc:mysql://sparkmaster.company.com:3306/hive
at
org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:516)
at
org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:298)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
at
org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
at
org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187)
at
org.datanucleus.NucleusContext.initialise(NucleusContext.java:356)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
at
javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at
javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339)
at
org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248)
at
org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at

Re: Processing graphs

2015-02-18 Thread Vijayasarathy Kannan
Hi,

Thanks for your reply.

I basically want to check if my understanding what parallelize() on RDDs is
correct. In my case, I create a vertex RDD and edge RDD and distribute them
by calling parallelize(). Now does Spark perform any operation on these
RDDs in parallel?

For example, if I apply groupBy on the edge RDD (grouping by source vertex)
and call a function F on the grouped RDD, will F be applied on each group
in parallel and will Spark determine how to do this in parallel regardless
of the number of groups?

Thanks.

On Tue, Feb 17, 2015 at 5:03 PM, Yifan LI iamyifa...@gmail.com wrote:

 Hi Kannan,

 I am not sure I have understood what your question is exactly, but maybe
 the reduceByKey or reduceByKeyLocally functionality is better to your need.



 Best,
 Yifan LI





 On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote:

 Hi,

 I am working on a Spark application that processes graphs and I am trying
 to do the following.

 - group the vertices (key - vertex, value - set of its outgoing edges)
 - distribute each key to separate processes and process them (like mapper)
 - reduce the results back at the main process

 Does the groupBy functionality do the distribution by default?
 Do we have to explicitly use RDDs to enable automatic distribution?

 It'd be great if you could help me understand these and how to go about
 with the problem.

 Thanks.





Re: Spark Streaming output cannot be used as input?

2015-02-18 Thread Tim Smith
+1 for writing the Spark output to Kafka. You can then hang off multiple
compute/storage framework from kafka. I am using a similar pipeline to feed
ElasticSearch and HDFS in parallel. Allows modularity, you can take down
ElasticSearch or HDFS for maintenance without losing (except for some edge
cases) data.

You can even pipeline other Spark streaming apps off kafka to modularize
your processing pipeline so you don't have one single big Spark app doing
all the processing.



On Wed, Feb 18, 2015 at 3:34 PM, Jose Fernandez jfernan...@sdl.com wrote:

  Thanks for the advice folks, it is much appreciated. This seems like a
 pretty unfortunate design flaw. My team was surprised by it.



 I’m going to drop the two-step process and do it all in a single step
 until we get Kafka online.



 *From:* Sean Owen [mailto:so...@cloudera.com]
 *Sent:* Wednesday, February 18, 2015 1:53 AM
 *To:* Emre Sevinc
 *Cc:* Jose Fernandez; user@spark.apache.org
 *Subject:* Re: Spark Streaming output cannot be used as input?



 To clarify, sometimes in the world of Hadoop people freely refer to an
 output 'file' when it's really a directory containing 'part-*' files which
 are pieces of the file. It's imprecise but that's the meaning. I think the
 scaladoc may be referring to 'the path to the file, which includes this
 parent dir, is generated ...' In an inherently distributed system, you want
 to distributed writes and reads, so big files are really made of logical
 files within a directory.



 There is a JIRA open to support nested dirs which has been languishing:
 https://issues.apache.org/jira/browse/SPARK-3586

 I'm hoping to pursue that again with help from tdas after 1.3.

 That's probably the best solution.



 An alternative is to not use the file system as a sort of message queue,
 and instead use something like Kafka. It has a lot of other benefits but
 maybe it's not feasible to add this to your architecture.



 You can merge the files with HDFS APIs without much trouble. The dirs will
 be named consistently according to time and are something you can also
 query for.



 Making 1 partition has implications for parallelism of your job.



 Emre, I think I see what you're getting at but you have the map +
 materialize pattern which i think doesn't have the right guarantees about
 re-execution. Why not foreachRDD?



 Yes you can also consider collecting the whole RDD in foreachRDD and doing
 what you like, including writing to one file. But that would only work if
 the data is always small in each RDD.




   http://www.sdl.com/innovate/sanfran


   SDL PLC confidential, all rights reserved. If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.

 On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello Jose,

 We've hit the same issue a couple of months ago. It is possible to write
 directly to files instead of creating directories, but it is not
 straightforward, and I haven't seen any clear demonstration in books,
 tutorials, etc.

 We do something like:

 SparkConf sparkConf = new SparkConf().setAppName(appName);
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
 Duration(batchInterval));
 JavaDStreamString stream = MyModuleApp.initializeJob(ssc);
 MyModuleApp.process(stream);



 And then in the process method:

 @Override public void process(JavaDStreamString inStream) {



 JavaDStreamString json = inStream.map(new 
 MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, 
 rejectedJSONoutputDir));

 forceOutput(json);

   }

  This, in turn, calls the following (I've removed the irrelevant lines to 
 focus on writing):


 public class MyModuleWorker implements FunctionString,String {

   public String call(String json) {


 // process the data and then write it

 writeJSON(json, validatedJSONoutputDir_);

   }



 }

 And the writeJSON method is:

 public static final void writeJSON(String json, String jsonDirPath) throws 
 IOException {

 String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + 
 .json.tmp;

 URI uri = URI.create(jsonFileName);

 Configuration conf = new Configuration();

 FileSystem fileSystem = FileSystem.get(uri, conf);

 FSDataOutputStream out = fileSystem.create(new Path(uri));

 out.write(json.getBytes(StandardCharsets.UTF_8));

 out.close();



 fileSystem.rename(new Path(uri),

 new Path(URI.create(jsonDirPath + / + 
 UUID.randomUUID().toString() + .json)));



   }



 Using a similar technique you might be able to achieve your objective.

 Kind regards,

 Emre Sevinç
 http://www.bigindustries.be/





 On Wed, Feb 18, 2015 at 4:32 AM, Jose 

Re: Spark and Spark Streaming code sharing best practice.

2015-02-18 Thread Jean-Pascal Billaud
Thanks Arush. I will check that out.

On Wed, Feb 18, 2015 at 11:06 AM, Arush Kharbanda 
ar...@sigmoidanalytics.com wrote:

 I find monoids pretty useful in this respect, basically separating out the
 logic in a monoid and then applying the logic to either a stream or a
 batch. A list of such practices could be really useful.

 On Thu, Feb 19, 2015 at 12:26 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey,

 It seems pretty clear that one of the strength of Spark is to be able to
 share your code between your batch and streaming layer. Though, given that
 Spark streaming uses DStream being a set of RDDs and Spark uses a single
 RDD there might some complexity associated with it.

 Of course since DStream is a superset of RDDs, one can just run the same
 code at the RDD granularity using DStream::forEachRDD. While this should
 work for map, I am not sure how that can work when it comes to reduce phase
 given that a group of keys spans across multiple RDDs.

 One of the option is to change the dataset object on which a job works
 on. For example of passing an RDD to a class method, one passes a higher
 level object (MetaRDD) that wraps around RDD or DStream depending the
 context. At this point the job calls its regular maps, reduces and so on
 and the MetaRDD wrapper would delegate accordingly.

 Just would like to know the official best practice from the spark
 community though.

 Thanks,




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Re: No suitable driver found error, Create table in hive from spark sql

2015-02-18 Thread Dhimant
Found solution from one of the post found on internet.
I updated spark/bin/compute-classpath.sh and added database connector jar
into classpath.
CLASSPATH=$CLASSPATH:/data/mysql-connector-java-5.1.14-bin.jar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-suitable-driver-found-error-Create-table-in-hive-from-spark-sql-tp21714p21715.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming output cannot be used as input?

2015-02-18 Thread Jose Fernandez
Thanks for the advice folks, it is much appreciated. This seems like a pretty 
unfortunate design flaw. My team was surprised by it.

I’m going to drop the two-step process and do it all in a single step until we 
get Kafka online.

From: Sean Owen [mailto:so...@cloudera.com]
Sent: Wednesday, February 18, 2015 1:53 AM
To: Emre Sevinc
Cc: Jose Fernandez; user@spark.apache.org
Subject: Re: Spark Streaming output cannot be used as input?

To clarify, sometimes in the world of Hadoop people freely refer to an output 
'file' when it's really a directory containing 'part-*' files which are pieces 
of the file. It's imprecise but that's the meaning. I think the scaladoc may be 
referring to 'the path to the file, which includes this parent dir, is 
generated ...' In an inherently distributed system, you want to distributed 
writes and reads, so big files are really made of logical files within a 
directory.

There is a JIRA open to support nested dirs which has been languishing: 
https://issues.apache.org/jira/browse/SPARK-3586
I'm hoping to pursue that again with help from tdas after 1.3.
That's probably the best solution.

An alternative is to not use the file system as a sort of message queue, and 
instead use something like Kafka. It has a lot of other benefits but maybe it's 
not feasible to add this to your architecture.

You can merge the files with HDFS APIs without much trouble. The dirs will be 
named consistently according to time and are something you can also query for.

Making 1 partition has implications for parallelism of your job.

Emre, I think I see what you're getting at but you have the map + materialize 
pattern which i think doesn't have the right guarantees about re-execution. Why 
not foreachRDD?

Yes you can also consider collecting the whole RDD in foreachRDD and doing what 
you like, including writing to one file. But that would only work if the data 
is always small in each RDD.


 [http://www.sdl.com/Content/images/Innovate_2015_400.png] 
www.sdl.com/innovate/sanfran

SDL PLC confidential, all rights reserved. If you are not the intended 
recipient of this mail SDL requests and requires that you delete it without 
acting upon or copying any of its contents, and we further request that you 
advise us.

SDL PLC is a public limited company registered in England and Wales. Registered 
number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, 
UK.

On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc 
emre.sev...@gmail.commailto:emre.sev...@gmail.com wrote:
Hello Jose,
We've hit the same issue a couple of months ago. It is possible to write 
directly to files instead of creating directories, but it is not 
straightforward, and I haven't seen any clear demonstration in books, 
tutorials, etc.
We do something like:

SparkConf sparkConf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(batchInterval));
JavaDStreamString stream = MyModuleApp.initializeJob(ssc);
MyModuleApp.process(stream);

And then in the process method:

@Override public void process(JavaDStreamString inStream) {



JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, 
validatedJSONoutputDir, rejectedJSONoutputDir));

forceOutput(json);

  }


This, in turn, calls the following (I've removed the irrelevant lines to focus 
on writing):


public class MyModuleWorker implements FunctionString,String {

  public String call(String json) {

// process the data and then write it

writeJSON(json, validatedJSONoutputDir_);

  }



}

And the writeJSON method is:

public static final void writeJSON(String json, String jsonDirPath) throws 
IOException {

String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + 
.json.tmp;

URI uri = URI.create(jsonFileName);

Configuration conf = new Configuration();

FileSystem fileSystem = FileSystem.get(uri, conf);

FSDataOutputStream out = fileSystem.create(new Path(uri));

out.write(json.getBytes(StandardCharsets.UTF_8));

out.close();



fileSystem.rename(new Path(uri),

new Path(URI.create(jsonDirPath + / + 
UUID.randomUUID().toString() + .json)));



  }

Using a similar technique you might be able to achieve your objective.
Kind regards,
Emre Sevinç
http://www.bigindustries.be/


On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez 
jfernan...@sdl.commailto:jfernan...@sdl.com wrote:
Hello folks,

Our intended use case is:

-  Spark Streaming app #1 reads from RabbitMQ and output to HDFS

-  Spark Streaming app #2 reads #1’s output and stores the data into 
Elasticsearch

The idea behind this architecture is that if Elasticsearch is down due to an 
upgrade or system error we don’t have to stop reading messages from the queue. 
We could also scale each process separately as needed.

After a few hours research my understanding is that Spark Streaming outputs 
files in a *directory* for which 

Re: Spark Streaming and message ordering

2015-02-18 Thread jay vyas
This is a *fantastic* question.  The idea of how we identify individual
things in multiple  DStreams is worth looking at.

The reason being, that you can then fine tune your streaming job, based on
the RDD identifiers (i.e. are the timestamps from the producer correlating
closely to the order in which RDD elements are being produced) ?  If *NO*
then you need to (1) dial up throughput on producer sources or else (2)
increase cluster size so that spark is capable of evenly handling load.

You cant decide to do (1) or (2) unless you can track  when the streaming
elements are being  converted to RDDs by spark itself.



On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote:

 There does not seem to be a definitive answer on this. Every time I google
 for message ordering,the only relevant thing that comes up is this  -
 http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html
 .

 With a kafka receiver that pulls data from a single kafka partition of a
 kafka topic, are individual messages in the microbatch in same the order as
 kafka partition? Are successive microbatches originating from a kafka
 partition executed in order?


 Thanks!





-- 
jay vyas


RE: NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr

2015-02-18 Thread Jose Fernandez
You need to instantiate the server in the forEachPartition block or Spark will 
attempt to serialize it to the task. See the design patterns section in the 
Spark Streaming guide.


Jose Fernandez | Principal Software Developer
jfernan...@sdl.com |

The information transmitted, including attachments, is intended only for the 
person(s) or entity to which it is addressed and may contain confidential 
and/or privileged material. Any review, retransmission, dissemination or other 
use of, or taking of any action in reliance upon this information by persons or 
entities other than the intended recipient is prohibited. If you received this 
in error, please contact the sender and destroy any copies of this information.


Jose Fernandez | Principal Software Developer
jfernan...@sdl.com |

The information transmitted, including attachments, is intended only for the 
person(s) or entity to which it is addressed and may contain confidential 
and/or privileged material. Any review, retransmission, dissemination or other 
use of, or taking of any action in reliance upon this information by persons or 
entities other than the intended recipient is prohibited. If you received this 
in error, please contact the sender and destroy any copies of this information.

-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
Sent: Wednesday, February 18, 2015 1:54 PM
To: user@spark.apache.org
Subject: NotSerializableException: 
org.apache.http.impl.client.DefaultHttpClient when trying to send documents to 
Solr

I'm using Solrj in a Spark program. When I try to send the docs to Solr, I get 
the NotSerializableException on the DefaultHttpClient.  Is there a possible fix 
or workaround?

I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0.

final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL); ...
JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow,
SolrInputDocument() {
public SolrInputDocument call(Row r) {
return r.toSolrDocument();
}
});

solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument()
{
public void call(IteratorSolrInputDocument solrDocIterator) throws 
Exception {
ListSolrInputDocument batch = new 
ArrayListSolrInputDocument();

while (solrDocIterator.hasNext()) {
SolrInputDocument inputDoc = solrDocIterator.next();
batch.add(inputDoc);
if (batch.size() = batchSize) {
Utils.sendBatchToSolr(solrServer, 
solrCollection, batch);
}
}
if (!batch.isEmpty()) {
Utils.sendBatchToSolr(solrServer, solrCollection, 
batch);
}
}
});



Exception in thread main org.apache.spark.SparkException: Task not 
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195)
at
org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32)
at
com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158)
at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException:
org.apache.http.impl.client.DefaultHttpClient
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at

Spark Streaming and message ordering

2015-02-18 Thread Neelesh
There does not seem to be a definitive answer on this. Every time I google
for message ordering,the only relevant thing that comes up is this  -
http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html
.

With a kafka receiver that pulls data from a single kafka partition of a
kafka topic, are individual messages in the microbatch in same the order as
kafka partition? Are successive microbatches originating from a kafka
partition executed in order?


Thanks!