Does spark *always* fork its workers?
I want to map over a Cassandra table in Spark but my code that executes needs a shutdown() call to return any threads, release file handles, etc. Will spark always execute my mappers as a forked process? And if so how do I handle threads preventing the JVM from terminating. It would be nice if there was a way to clean up after yourself gracefully in map jobs but I don’t think that exists right now. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: Spark Streaming output cannot be used as input?
Hello Jose, We've hit the same issue a couple of months ago. It is possible to write directly to files instead of creating directories, but it is not straightforward, and I haven't seen any clear demonstration in books, tutorials, etc. We do something like: SparkConf sparkConf = new SparkConf().setAppName(appName); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(batchInterval)); JavaDStreamString stream = MyModuleApp.initializeJob(ssc); MyModuleApp.process(stream); And then in the process method: @Override public void process(JavaDStreamString inStream) { JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); forceOutput(json); } This, in turn, calls the following (I've removed the irrelevant lines to focus on writing): public class MyModuleWorker implements FunctionString,String { public String call(String json) { // process the data and then write it writeJSON(json, validatedJSONoutputDir_); }} And the writeJSON method is: public static final void writeJSON(String json, String jsonDirPath) throws IOException {String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + .json.tmp;URI uri = URI.create(jsonFileName);Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(uri, conf); FSDataOutputStream out = fileSystem.create(new Path(uri)); out.write(json.getBytes(StandardCharsets.UTF_8));out.close(); fileSystem.rename(new Path(uri),new Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() + .json))); } Using a similar technique you might be able to achieve your objective. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez jfernan...@sdl.com wrote: Hello folks, Our intended use case is: - Spark Streaming app #1 reads from RabbitMQ and output to HDFS - Spark Streaming app #2 reads #1’s output and stores the data into Elasticsearch The idea behind this architecture is that if Elasticsearch is down due to an upgrade or system error we don’t have to stop reading messages from the queue. We could also scale each process separately as needed. After a few hours research my understanding is that Spark Streaming outputs files in a *directory* for which you provide the prefix and suffix. This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting otherwise: /** * Save each RDD in this DStream as a Sequence file of serialized objects. * The file name at each batch interval is generated based on `prefix` and * `suffix`: prefix-TIME_IN_MS.suffix. */ Spark Streaming can monitor an HDFS directory for files but subfolders are not supported. So as far as I can tell, it is not possible to use Spark Streaming output as input for a different Spark Streaming app without somehow performing a separate operation in the middle. Am I missing something obvious? I’ve read some suggestions like using Hadoop to merge the directories (whose names I don’t see how you would know) and to reduce the partitions to 1 (which wouldn’t help). Any other suggestions? What is the expected pattern a developer would follow that would make Spark Streaming’s output format usable? www.sdl.com http://www.sdl.com/?utm_source=Emailutm_medium=Email%2BSignatureutm_campaign=SDL%2BStandard%2BEmail%2BSignature *SDL PLC confidential, all rights reserved.* If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.com -- Emre Sevinc
Cannot access Spark web UI
Hello Experts, I am running a spark-streaming app inside YARN. I have Spark History server running as well (Do we need it running to access UI?). The app is running fine as expected but the Spark's web UI is not accessible. When I try to access the ApplicationMaster of the Yarn application I get the below error. This looks very similar to https://issues.apache.org/jira/browse/SPARK-5837 but instead of java.net.ConnectException: Connection refused I am getting java.net.BindException: Cannot assign requested address as shown below. Please let me know if you have faced / fixed this issue, any help is greatly appreciated. *Exception* HTTP ERROR 500 Problem accessing /proxy/application_1424161379156_0001/. Reason: Cannot assign requested address Caused by: java.net.BindException: Cannot assign requested address at java.net.PlainSocketImpl.socketBind(Native Method) at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376) at java.net.Socket.bind(Socket.java:631) at java.net.Socket.init(Socket.java:423) at java.net.Socket.init(Socket.java:280) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:592) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:555) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767) at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450) at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230) at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152) at org.mortbay.jetty.Server.handle(Server.java:326) at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542) at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928) at
Re: Cannot access Spark web UI
The error says Cannot assign requested address. This means that you need to use the correct address for one of your network interfaces or 0.0.0.0 to accept connections from all interfaces. Can you paste your spark-env.sh file and /etc/hosts file. Thanks Best Regards On Wed, Feb 18, 2015 at 2:06 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I am running a spark-streaming app inside YARN. I have Spark History server running as well (Do we need it running to access UI?). The app is running fine as expected but the Spark's web UI is not accessible. When I try to access the ApplicationMaster of the Yarn application I get the below error. This looks very similar to https://issues.apache.org/jira/browse/SPARK-5837 but instead of java.net.ConnectException: Connection refused I am getting java.net.BindException: Cannot assign requested address as shown below. Please let me know if you have faced / fixed this issue, any help is greatly appreciated. *Exception* HTTP ERROR 500 Problem accessing /proxy/application_1424161379156_0001/. Reason: Cannot assign requested address Caused by: java.net.BindException: Cannot assign requested address at java.net.PlainSocketImpl.socketBind(Native Method) at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376) at java.net.Socket.bind(Socket.java:631) at java.net.Socket.init(Socket.java:423) at java.net.Socket.init(Socket.java:280) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:592) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:555) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767) at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450) at
RE: spark-core in a servlet
Check for the dependencies. Looks like you have a conflict around servlet-api jars. Maven's dependency-tree, some exclusions and some luck :) could help. From: Ralph Bergmann | the4thFloor.eu [ra...@the4thfloor.eu] Sent: Tuesday, February 17, 2015 4:14 PM To: user@spark.apache.org Subject: spark-core in a servlet Hi, I want to use spark-core inside of a HttpServlet. I use Maven for the build task but I have a dependency problem :-( I get this error message: ClassCastException: com.sun.jersey.server.impl.container.servlet.JerseyServletContainerInitializer cannot be cast to javax.servlet.ServletContainerInitializer When I add this exclusions it builds but than there are other classes not found at runtime: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.2.1/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion exclusion groupIdorg.eclipse.jetty/groupId artifactId*/artifactId /exclusion /exclusions /dependency What can I do? Thanks a lot!, Ralph -- Ralph Bergmann iOS and Android app developer www http://www.the4thFloor.eu mail ra...@the4thfloor.eu skypedasralph google+ https://plus.google.com/+RalphBergmann xing https://www.xing.com/profile/Ralph_Bergmann3 linkedin https://www.linkedin.com/in/ralphbergmann gulp https://www.gulp.de/Profil/RalphBergmann.html github https://github.com/the4thfloor pgp key id 0x421F9B78 pgp fingerprint CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hello Imran, (a) I know that all 20 files are processed when I use foreachRDD, because I can see the processed files in the output directory. (My application logic writes them to an output directory after they are processed, *but* that writing operation does not happen in foreachRDD, below you can see the URL that includes my code and clarifies this). (b) I know only 16 files are processed because in the output directory I see only 16 files processed. I wait for minutes and minutes and no more files appear in the output directory. When I see only 16 files are processed and Spark Streaming went to the mode of idly watching the input directory, and then if I copy a few more files, they are also processed. (c) Sure, you can see part of my code in the following gist: https://gist.github.com/emres/0fb6de128baea099e741 It might seem a little convoluted at first, because my application is divided into two classes, a Driver class (setting up things and initializing them), and a Worker class (that implements the core functionality). I've also put the relevant methods from the my utility classes for completeness. I am as perplexed as you are as to why forcing the output via foreachRDD ended up in different behaviour compared to simply using print() method. Kind regards, Emre On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com wrote: Hi Emre, there shouldn't be any difference in which files get processed w/ print() vs. foreachRDD(). In fact, if you look at the definition of print(), it is just calling foreachRDD() underneath. So there is something else going on here. We need a little more information to figure out exactly what is going on. (I think Sean was getting at the same thing ...) (a) how do you know that when you use foreachRDD, all 20 files get processed? (b) How do you know that only 16 files get processed when you print()? Do you know the other files are being skipped, or maybe they are just stuck somewhere? eg., suppose you start w/ 20 files, and you see 16 get processed ... what happens after you add a few more files to the directory? Are they processed immediately, or are they never processed either? (c) Can you share any more code of what you are doing to the dstreams *before* the print() / foreachRDD()? That might give us more details about what the difference is. I can't see how .count.println() would be different than just println(), but maybe I am missing something also. Imran On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it happened. Print should print some results to the console but on different machines, so may not be a reliable way to see what happened. Yes I understand your real process uses foreachRDD and that's what you should use. It sounds like that works. But you must always have been using that right? What do you mean that you changed to use it? Basically I'm not clear on what the real code does and what about the output of that code tells you only 16 files were processed. On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be
Re: Re: Problem with 1 master + 2 slaves cluster
On Wed, Feb 18, 2015 at 10:23 AM, bit1...@163.com bit1...@163.com wrote: Sure, thanks Akhil. A further question : Is local file system(file:///) not supported in standalone cluster? FYI: I'm able to write to local file system (via HDFS API and using file:/// notation) when using Spark. -- Emre Sevinç http://www.bigindustries.be/
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Thanks to everyone for suggestions and explanations. Currently I've started to experiment with the following scenario, that seems to work for me: - Put the properties file on a web server so that it is centrally available - Pass it to the Spark driver program via --conf 'propertiesFile=http: //myWebServer.com/mymodule.properties' - And then load the configuration using Apache Commons Configuration: PropertiesConfiguration config = new PropertiesConfiguration(); config.load(System.getProperty(propertiesFile)); Using the method described above, I don't need to statically compile my properties file into the über JAR anymore, I can modify the file on the web server and when I submit my application via spark-submit, passing the URL of the properties file, the driver program reads the contents of that file for once, retrieves the values of the keys and continues. PS: I've opted for Apache Commons Configuration because it is already part of the many dependencies I have in my pom.xml, and I did not want to pull another library, even though I Typesafe Config library seems to be a powerful and flexible choice, too. -- Emre On Tue, Feb 17, 2015 at 6:12 PM, Charles Feduke charles.fed...@gmail.com wrote: Emre, As you are keeping the properties file external to the JAR you need to make sure to submit the properties file as an additional --files (or whatever the necessary CLI switch is) so all the executors get a copy of the file along with the JAR. If you know you are going to just put the properties file on HDFS then why don't you define a custom system setting like properties.url and pass it along: (this is for Spark shell, the only CLI string I have available at the moment:) spark-shell --jars $JAR_NAME \ --conf 'properties.url=hdfs://config/stuff.properties' \ --conf 'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties' ... then load the properties file during initialization by examining the properties.url system setting. I'd still strongly recommend Typesafe Config as it makes this a lot less painful, and I know for certain you can place your *.conf at a URL (using the -Dconfig.url=) though it probably won't work with an HDFS URL. On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com wrote: +1 for TypeSafe config Our practice is to include all spark properties under a 'spark' entry in the config file alongside job-specific configuration: A config file would look like: spark { master = cleaner.ttl = 123456 ... } job { context { src = foo action = barAction } prop1 = val1 } Then, to create our Spark context, we transparently pass the spark section to a SparkConf instance. This idiom will instantiate the context with the spark specific configuration: sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark))) And we can make use of the config object everywhere else. We use the override model of the typesafe config: reasonable defaults go in the reference.conf (within the jar). Environment-specific overrides go in the application.conf (alongside the job jar) and hacks are passed with -Dprop=value :-) -kr, Gerard. On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've decided to try spark-submit ... --conf spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties But when I try to retrieve the value of propertiesFile via System.err.println(propertiesFile : + System.getProperty(propertiesFile)); I get NULL: propertiesFile : null Interestingly, when I run spark-submit with --verbose, I see that it prints: spark.driver.extraJavaOptions - -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties I couldn't understand why I couldn't get to the value of propertiesFile by using standard System.getProperty method. (I can use new SparkConf().get(spark.driver.extraJavaOptions) and manually parse it, and retrieve the value, but I'd like to know why I cannot retrieve that value using System.getProperty method). Any ideas? If I can achieve what I've described above properly, I plan to pass a properties file that resides on HDFS, so that it will be available to my driver program wherever that program runs. -- Emre On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com wrote: I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'
cannot connect to Spark Application Master in YARN
I'm trying to access the Spark UI for an application running through YARN. Clicking on the Application Master under Tracking UI I get an HTTP ERROR 500: HTTP ERROR 500 Problem accessing /proxy/application_1423151769242_0088/. Reason: Connection refused Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:280) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:187) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:344) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:572) at org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:269) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:542) at org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:84) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1224) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766) at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450) at
Re: cannot connect to Spark Application Master in YARN
Can you track your comments on the existing issue? https://issues.apache.org/jira/browse/SPARK-5837 I personally can't reproduce this but more info would help narrow it down. On Wed, Feb 18, 2015 at 10:58 AM, rok rokros...@gmail.com wrote: I'm trying to access the Spark UI for an application running through YARN. Clicking on the Application Master under Tracking UI I get an HTTP ERROR 500: HTTP ERROR 500 Problem accessing /proxy/application_1423151769242_0088/. Reason: Connection refused Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:280) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:187) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:344) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:572) at org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:269) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:542) at org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:84) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1224) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) at
Is there a limit to the number of RDDs in a Spark context?
Hi, I'm writing a Spark program where I want to divide a RDD into different groups, but the groups are too big to use groupByKey. To cope with that, since I know in advance the list of keys for each group, I build a map from the keys to the RDDs that result from filtering the input RDD to get the records for the corresponding key. This works when I have a small number of keys, but for big number of keys (tens of thousands) the execution gets stuck, without issuing any new Spark stage. I suspect the reason is that the Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm rewriting the program to use a single RDD of pairs, with cached partions, but I wanted to be sure I understand the problem here. Thanks a lot in advance, Greetings, Juan Rodriguez
spark 1.2 slower than 1.0 in unit tests
Hi We're using Spark in our app's unit tests. The tests start spark context with local[*] and test time now is 178 seconds on spark 1.2 instead of 41 seconds on 1.0. We are using spark version from cloudera CDH (1.2.0-cdh5.3.1). Could you give some hints what could cause that? and where to search for a solution for that? Regards Marcin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[POWERED BY] Can you add Big Industries to the Powered by Spark page?
Hello, Could you please add Big Industries to the Powered by Spark page at https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ? Company Name: Big Industries URL: http://http://www.bigindustries.be/ Spark Components: Spark Streaming Use Case: Big Content Platform Summary: The Big Content Platform is a business-to-business content asset management service providing a searchable, aggregated source of live news feeds, public domain media and archives of content. The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the platform leverages public datasets like Freebase, DBpedia, Wiktionary, and Geonames to support semantic text enrichment. Kind regards, Emre Sevinç http://http://www.bigindustries.be/
Re: Problem with 1 master + 2 slaves cluster
But I am able to run the SparkPi example: ./run-example SparkPi 1000 --master spark://192.168.26.131:7077 Result:Pi is roughly 3.14173708 bit1...@163.com From: bit1...@163.com Date: 2015-02-18 16:29 To: user Subject: Problem with 1 master + 2 slaves cluster Hi sparkers, I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then startup them, everything looks running normally. In the master node, I run the spark-shell, with the following steps: bin/spark-shell --master spark://192.168.26.131:7077 scala var rdd = sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7) rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2, x._1)).sortByKey(false).map(x = (x._2, x._1)).saveAsTextFile(file:///home/hadoop/output) After finishing running the application, there is no word count related output, there does exist an output directory appear on each slave node, but there is only a _temporary subdirectory Any ideas? Thanks!
Re: Re: Problem with 1 master + 2 slaves cluster
when you give file:// , while reading, it requires that all slaves has that path/file available locally in their system. It's ok to give file:// when you run your application in local mode (like master=local[*]) Thanks Best Regards On Wed, Feb 18, 2015 at 2:58 PM, Emre Sevinc emre.sev...@gmail.com wrote: On Wed, Feb 18, 2015 at 10:23 AM, bit1...@163.com bit1...@163.com wrote: Sure, thanks Akhil. A further question : Is local file system(file:///) not supported in standalone cluster? FYI: I'm able to write to local file system (via HDFS API and using file:/// notation) when using Spark. -- Emre Sevinç http://www.bigindustries.be/
Re: Cannot access Spark web UI
It seems like that its not able to get a port it needs are you sure that the required port is available. In what logs did you find this error? On Wed, Feb 18, 2015 at 2:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: The error says Cannot assign requested address. This means that you need to use the correct address for one of your network interfaces or 0.0.0.0 to accept connections from all interfaces. Can you paste your spark-env.sh file and /etc/hosts file. Thanks Best Regards On Wed, Feb 18, 2015 at 2:06 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I am running a spark-streaming app inside YARN. I have Spark History server running as well (Do we need it running to access UI?). The app is running fine as expected but the Spark's web UI is not accessible. When I try to access the ApplicationMaster of the Yarn application I get the below error. This looks very similar to https://issues.apache.org/jira/browse/SPARK-5837 but instead of java.net.ConnectException: Connection refused I am getting java.net.BindException: Cannot assign requested address as shown below. Please let me know if you have faced / fixed this issue, any help is greatly appreciated. *Exception* HTTP ERROR 500 Problem accessing /proxy/application_1424161379156_0001/. Reason: Cannot assign requested address Caused by: java.net.BindException: Cannot assign requested address at java.net.PlainSocketImpl.socketBind(Native Method) at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376) at java.net.Socket.bind(Socket.java:631) at java.net.Socket.init(Socket.java:423) at java.net.Socket.init(Socket.java:280) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:592) at org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:555) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) at
Re: Spark Streaming output cannot be used as input?
To clarify, sometimes in the world of Hadoop people freely refer to an output 'file' when it's really a directory containing 'part-*' files which are pieces of the file. It's imprecise but that's the meaning. I think the scaladoc may be referring to 'the path to the file, which includes this parent dir, is generated ...' In an inherently distributed system, you want to distributed writes and reads, so big files are really made of logical files within a directory. There is a JIRA open to support nested dirs which has been languishing: https://issues.apache.org/jira/browse/SPARK-3586 I'm hoping to pursue that again with help from tdas after 1.3. That's probably the best solution. An alternative is to not use the file system as a sort of message queue, and instead use something like Kafka. It has a lot of other benefits but maybe it's not feasible to add this to your architecture. You can merge the files with HDFS APIs without much trouble. The dirs will be named consistently according to time and are something you can also query for. Making 1 partition has implications for parallelism of your job. Emre, I think I see what you're getting at but you have the map + materialize pattern which i think doesn't have the right guarantees about re-execution. Why not foreachRDD? Yes you can also consider collecting the whole RDD in foreachRDD and doing what you like, including writing to one file. But that would only work if the data is always small in each RDD. On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Jose, We've hit the same issue a couple of months ago. It is possible to write directly to files instead of creating directories, but it is not straightforward, and I haven't seen any clear demonstration in books, tutorials, etc. We do something like: SparkConf sparkConf = new SparkConf().setAppName(appName); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(batchInterval)); JavaDStreamString stream = MyModuleApp.initializeJob(ssc); MyModuleApp.process(stream); And then in the process method: @Override public void process(JavaDStreamString inStream) { JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); forceOutput(json); } This, in turn, calls the following (I've removed the irrelevant lines to focus on writing): public class MyModuleWorker implements FunctionString,String { public String call(String json) { // process the data and then write it writeJSON(json, validatedJSONoutputDir_); }} And the writeJSON method is: public static final void writeJSON(String json, String jsonDirPath) throws IOException {String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + .json.tmp;URI uri = URI.create(jsonFileName);Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(uri, conf);FSDataOutputStream out = fileSystem.create(new Path(uri)); out.write(json.getBytes(StandardCharsets.UTF_8));out.close(); fileSystem.rename(new Path(uri),new Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() + .json))); } Using a similar technique you might be able to achieve your objective. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez jfernan...@sdl.com wrote: Hello folks, Our intended use case is: - Spark Streaming app #1 reads from RabbitMQ and output to HDFS - Spark Streaming app #2 reads #1’s output and stores the data into Elasticsearch The idea behind this architecture is that if Elasticsearch is down due to an upgrade or system error we don’t have to stop reading messages from the queue. We could also scale each process separately as needed. After a few hours research my understanding is that Spark Streaming outputs files in a *directory* for which you provide the prefix and suffix. This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting otherwise: /** * Save each RDD in this DStream as a Sequence file of serialized objects. * The file name at each batch interval is generated based on `prefix` and * `suffix`: prefix-TIME_IN_MS.suffix. */ Spark Streaming can monitor an HDFS directory for files but subfolders are not supported. So as far as I can tell, it is not possible to use Spark Streaming output as input for a different Spark Streaming app without somehow performing a separate operation in the middle. Am I missing something obvious? I’ve read some suggestions like using Hadoop to merge the directories (whose names I don’t see how you would know) and to reduce the partitions to 1 (which wouldn’t help). Any other suggestions? What is the expected pattern a developer would follow that would make Spark Streaming’s output format usable? www.sdl.com
Re: Why groupBy is slow?
Thanks Francois for the comment and useful link. I understand the problem better now. best, /Shahab On Wed, Feb 18, 2015 at 10:36 AM, francois.garil...@typesafe.com wrote: In a nutshell : because it’s moving all of your data, compared to other operations (e.g. reduce) that summarize it in one form or another before moving it. For the longer answer: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html — FG On Wed, Feb 18, 2015 at 10:33 AM, shahab shahab.mok...@gmail.com wrote: Hi, Based on what I could see in the Spark UI, I noticed that groupBy transformation is quite slow (taking a lot of time) compared to other operations. Is there any reason that groupBy is slow? shahab
Re: Is spark streaming +MlLib for online learning?
Hi What is the general consensus/roadmap for implementing additional online / streamed trainable models? Apache Spark 1.2.1 currently supports streaming linear regression clustering, although other streaming linear methods are planned according to the issue tracker. However, I can not find any details on the issue tracker about online training of a collaborative filter. Judging from another mailing list discussion http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ce07aa61e-eeb9-4ded-be3e-3f04003e4...@storefront.be%3E incremental training should be possible for ALS. Any plans for the future? Regards mucaho -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-spark-streaming-MlLib-for-online-learning-tp19701p21698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem with 1 master + 2 slaves cluster
Since the cluster is standalone, you are better off reading/writing to hdfs instead of local filesystem. Thanks Best Regards On Wed, Feb 18, 2015 at 2:32 PM, bit1...@163.com bit1...@163.com wrote: But I am able to run the SparkPi example: ./run-example SparkPi 1000 --master spark://192.168.26.131:7077 Result:Pi is roughly 3.14173708 -- bit1...@163.com *From:* bit1...@163.com *Date:* 2015-02-18 16:29 *To:* user user@spark.apache.org *Subject:* Problem with 1 master + 2 slaves cluster Hi sparkers, I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then startup them, everything looks running normally. In the master node, I run the spark-shell, with the following steps: bin/spark-shell --master spark://192.168.26.131:7077 scala var rdd = sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7) rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2, x._1)).sortByKey(false).map(x = (x._2, x._1)).saveAsTextFile(file:///home/hadoop/output) After finishing running the application, there is no word count related output, there does exist an output directory appear on each slave node, but there is only a _temporary subdirectory Any ideas? Thanks! --
Re: How to pass parameters to a spark-jobserver Scala class?
Hi Sasi, Forgot to mention job server uses Typesafe Config library. The input is JSON, you can find syntax in below link https://github.com/typesafehub/config Regards, Vasu C -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21695.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does spark *always* fork its workers?
Forked, meaning, different from the driver? Spark will in general not even execute your tasks on the same machine as your driver. The driver can choose to execute a task locally in some cases. You are creating non-daemon threads in your function? your function can and should clean up after itself. Just use try-finally to shut down your pool. Or you can consider whether you can just make daemon threads. There's no separate mechanism; you just write this into your function. I assume you're looking at something like foreachPartitions and mean 'mapper' by way of analogy to MapReduce. This works then. But if you really mean mapPartitions, beware that it is not an action and is lazily evaluated. Also consider not parallelizing manually -- is there really a need for that? it's much simpler to let Spark manage it if possible. On Wed, Feb 18, 2015 at 8:26 AM, Kevin Burton bur...@spinn3r.com wrote: I want to map over a Cassandra table in Spark but my code that executes needs a shutdown() call to return any threads, release file handles, etc. Will spark always execute my mappers as a forked process? And if so how do I handle threads preventing the JVM from terminating. It would be nice if there was a way to clean up after yourself gracefully in map jobs but I don’t think that exists right now. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile https://plus.google.com/102718274791889610666/posts http://spinn3r.com
Re: Why groupBy is slow?
In a nutshell : because it’s moving all of your data, compared to other operations (e.g. reduce) that summarize it in one form or another before moving it. For the longer answer: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html — FG On Wed, Feb 18, 2015 at 10:33 AM, shahab shahab.mok...@gmail.com wrote: Hi, Based on what I could see in the Spark UI, I noticed that groupBy transformation is quite slow (taking a lot of time) compared to other operations. Is there any reason that groupBy is slow? shahab
Re: Re: Problem with 1 master + 2 slaves cluster
Sure, thanks Akhil. A further question : Is local file system(file:///) not supported in standalone cluster? bit1...@163.com From: Akhil Das Date: 2015-02-18 17:35 To: bit1...@163.com CC: user Subject: Re: Problem with 1 master + 2 slaves cluster Since the cluster is standalone, you are better off reading/writing to hdfs instead of local filesystem. Thanks Best Regards On Wed, Feb 18, 2015 at 2:32 PM, bit1...@163.com bit1...@163.com wrote: But I am able to run the SparkPi example: ./run-example SparkPi 1000 --master spark://192.168.26.131:7077 Result:Pi is roughly 3.14173708 bit1...@163.com From: bit1...@163.com Date: 2015-02-18 16:29 To: user Subject: Problem with 1 master + 2 slaves cluster Hi sparkers, I setup a spark(1.2.1) cluster with 1 master and 2 slaves, and then startup them, everything looks running normally. In the master node, I run the spark-shell, with the following steps: bin/spark-shell --master spark://192.168.26.131:7077 scala var rdd = sc.textFile(file:///home/hadoop/history.txt.used.byspark, 7) rdd.flatMap(_.split( )).map((_, 1)).reduceByKey(_ + _,5).map(x = (x._2, x._1)).sortByKey(false).map(x = (x._2, x._1)).saveAsTextFile(file:///home/hadoop/output) After finishing running the application, there is no word count related output, there does exist an output directory appear on each slave node, but there is only a _temporary subdirectory Any ideas? Thanks!
Why groupBy is slow?
Hi, Based on what I could see in the Spark UI, I noticed that groupBy transformation is quite slow (taking a lot of time) compared to other operations. Is there any reason that groupBy is slow? shahab
issue Running Spark Job on Yarn Cluster
Hi, I want to run my spark Job in Hadoop yarn Cluster mode, I am using below command - spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g --executor-cores 1 --class com.dc.analysis.jobs.AggregationJob sparkanalitic.jar param1 param2 param3 I am getting error as under, kindly suggest whats going wrong ,is command is proper or not ,thanks in advance, Exception in thread main org.apache.spark.SparkException: Application finished with failed status at org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:509) at org.apache.spark.deploy.yarn.Client.run(Client.scala:35) at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139) at org.apache.spark.deploy.yarn.Client.main(Client.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Creating RDDs from within foreachPartition() [Spark-Streaming]
Hi all, I am trying to create RDDs from within /rdd.foreachPartition()/ so I can save these RDDs to ElasticSearch on the fly : stream.foreachRDD(rdd = { rdd.foreachPartition { iterator = { val sc = rdd.context iterator.foreach { case (cid, sid, ts) = { [...] sc.makeRDD(...).saveToEs(...) - *throws a NullPointerException (sc is null)* } } } } } Unfortunately this doesn't work as I can't seem to be able to access the SparkContext from anywhere within /foreachPartition()/. The code above throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where ssc is the StreamingContext object created in the main function, outside of /foreachPartition/) then I get a NotSerializableException. What is the correct way to do this ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Job Fails on sortByKey
hi, I have a job that fails on a shuffle during a sortByKey, on a relatively small dataset. http://pastebin.com/raw.php?i=1LxiG4rY
Re: JdbcRDD, ClassCastException with scala.Function0
Thanks, Cody. Yes, I originally started off by looking at that but I get a compile error if I try and use that approach: constructor JdbcRDD in class JdbcRDDT cannot be applied to given types. Not to mention that JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last argument). Wonder if it's a JDK version issue, I'm using 1.7. So I've got this, which doesn't compile JdbcRDDRow jdbcRDD = new JdbcRDDRow( new SparkContext(conf), new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { Connection conn = null; try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); } catch (ClassNotFoundException ex) { throw new RuntimeException(Error while loading JDBC driver., ex); } return conn; } }, SELECT * FROM EMPLOYEES, 0L, 1000L, 10, new FunctionResultSet, Row() { public Row call(ResultSet r) throws Exception { return null; // have some actual logic here... } }, scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); The other approach was mimicing the DbConnection class from this post: http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of Function0Connection whose instance I supply as the 'getConnection' parameter into the JdbcRDD constructor. Compiles fine. The definition of the class/function is as follows: public class GetDbConnection extends AbstractFunction0Connection implements Serializable where scala.runtime.AbstractFunction0 extends scala.Function0. At runtime, I get an exception as below. Does anyone have an idea as to how to resolve this/work around it? Thanks. I'm running Spark 1.2.1 built for Hadoop 2.4. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): java.lang.ClassCastException: cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to field org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection of type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at
Re: Job Fails on sortByKey
Would you mind explaining your problem a little more specifically, like exceptions you met or others, so someone who has experiences on it could give advice. Thanks Jerry 2015-02-19 1:08 GMT+08:00 athing goingon athinggoin...@gmail.com: hi, I have a job that fails on a shuffle during a sortByKey, on a relatively small dataset. http://pastebin.com/raw.php?i=1LxiG4rY
Re: JdbcRDD, ClassCastException with scala.Function0
Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of Function0Connection whose instance I supply as the 'getConnection' parameter into the JdbcRDD constructor. Compiles fine. The definition of the class/function is as follows: public class GetDbConnection extends AbstractFunction0Connection implements Serializable where scala.runtime.AbstractFunction0 extends scala.Function0. At runtime, I get an exception as below. Does anyone have an idea as to how to resolve this/work around it? Thanks. I'm running Spark 1.2.1 built for Hadoop 2.4. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): java.lang.ClassCastException: cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to field org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection of type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context:
Re: JdbcRDD, ClassCastException with scala.Function0
Is sc there a SparkContext or a JavaSparkContext? The compilation error seems to indicate the former, but JdbcRDD.create expects the latter On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 1, new FunctionResultSet, Integer() { public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); The other thing I've tried was to define a static class locally for GetConnection and use the JdbcCreate constructor. This got around the compile issues but blew up at runtime with NoClassDefFoundError: scala/runtime/AbstractFunction0 ! JdbcRDDRow jdbcRDD = new JdbcRDDRow( sc, (AbstractFunction0Connection) new DbConn(), // had to cast or a compile error SQL_QUERY, 0L, 1000L, 10, new MapRow(), ROW_CLASS_TAG); // DbConn is defined as public static class DbConn extends AbstractFunction0Connection implements Serializable On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 is calling a static method JdbcRDD.create, not new JdbcRDD. Is that what you tried doing? On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody. Yes, I originally started off by looking at that but I get a compile error if I try and use that approach: constructor JdbcRDD in class JdbcRDDT cannot be applied to given types. Not to mention that JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last argument). Wonder if it's a JDK version issue, I'm using 1.7. So I've got this, which doesn't compile JdbcRDDRow jdbcRDD = new JdbcRDDRow( new SparkContext(conf), new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { Connection conn = null; try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); } catch (ClassNotFoundException ex) { throw new RuntimeException(Error while loading JDBC driver., ex); } return conn; } }, SELECT * FROM EMPLOYEES, 0L, 1000L, 10, new FunctionResultSet, Row() { public Row call(ResultSet r) throws Exception { return null; // have some actual logic here... } }, scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); The other approach was mimicing the DbConnection class from this post: http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of Function0Connection whose instance I supply as the 'getConnection' parameter into the JdbcRDD constructor. Compiles fine. The definition of the class/function is as follows: public class GetDbConnection extends AbstractFunction0Connection implements Serializable where scala.runtime.AbstractFunction0 extends scala.Function0. At runtime, I get an exception as below. Does anyone have an idea as to how to resolve this/work around it? Thanks. I'm running Spark 1.2.1 built for Hadoop 2.4. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): java.lang.ClassCastException: cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to field org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection of type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
build spark for cdh5
does anyone have the right maven invocation for cdh5 with yarn? i tried: $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn -DskipTests clean package $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn test it builds and passes tests just fine, but when i deploy on cluster and i try to run SparkPi i get: Caused by: java.lang.VerifyError: class org.apache.hadoop.yarn.proto.YarnServiceProtos$GetApplicationReportRequestProto overrides final method getUnknownFields.()Lcom/google/p\ rotobuf/UnknownFieldSet; so clearly i am doing something wrong. something with protobuf 2.4 versus 2.5 i do not want to use the cloudera version of spark for cdh 5 (it includes the wrong akka version for me) thanks
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
Hello, On Tue, Feb 17, 2015 at 8:53 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as Is the code in question running on the driver or in some executor? spark.files.userClassPathFirst only applies to executors. To override classes in the driver's classpath, you need to modify spark.driver.extraClassPath (or --driver-class-path in spark-submit's command line). In 1.3 there's an option similar to spark.files.userClassPathFirst that works for the driver too. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: build spark for cdh5
thanks! my bad On Wed, Feb 18, 2015 at 2:00 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Koert, You should be using -Phadoop-2.3 instead of -Phadoop2.3. -Sandy On Wed, Feb 18, 2015 at 10:51 AM, Koert Kuipers ko...@tresata.com wrote: does anyone have the right maven invocation for cdh5 with yarn? i tried: $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn -DskipTests clean package $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn test it builds and passes tests just fine, but when i deploy on cluster and i try to run SparkPi i get: Caused by: java.lang.VerifyError: class org.apache.hadoop.yarn.proto.YarnServiceProtos$GetApplicationReportRequestProto overrides final method getUnknownFields.()Lcom/google/p\ rotobuf/UnknownFieldSet; so clearly i am doing something wrong. something with protobuf 2.4 versus 2.5 i do not want to use the cloudera version of spark for cdh 5 (it includes the wrong akka version for me) thanks
Re: Tableau beta connector
Ashutosh, Were you able to figure this out? I am having the exact some question. I think the answer is to use Spark SQL to create/load a table in Hive (e.g. execute the HiveQL CREATE TABLE statement) but I am not sure. Hoping for something more simple than that. Anybody? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tableau-beta-connector-tp21512p21709.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a limit to the number of RDDs in a Spark context?
Hi Sean, Thanks a lot for your answer. That explains it, as I was creating thousands of RDDs, so I guess the communication overhead was the reason why the Spark job was freezing. After changing the code to use RDDs of pairs and aggregateByKey it works just fine, and quite fast. Again, thanks a lot for your help. Greetings, Juan 2015-02-18 15:35 GMT+01:00 Sean Owen so...@cloudera.com: At some level, enough RDDs creates hundreds of thousands of tiny partitions of data each of which creates a task for each stage. The raw overhead of all the message passing can slow things down a lot. I would not design something to use an RDD per key. You would generally use key by some value you want to divide and filter on, and then use a *ByKey to do your work. You don't work with individual RDDs this way, but usually that's good news. You usually have a lot more flexibility operating just in pure Java / Scala to do whatever you need inside your function. On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Paweł, Thanks a lot for your answer. I finally got the program to work by using aggregateByKey, but I was wondering why creating thousands of RDDs doesn't work. I think that could be interesting for using methods that work on RDDs like for example JavaDoubleRDD.stats() ( http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29 ). If the groups are small then I can chain groupBy(), collect(), parallelize() and stats(), but that is quite inefficient because it implies moving data to and from the driver, and also doesn't scale to big groups; on the other hand if I use aggregateByKey or a similar function then I cannot use stats() so I have to reimplement it. In general I was looking for a way to reuse other functions that I have that work on RDDs, for using them on groups of data in a RDD, because I don't see a how to directly apply them to each of the groups in a pair RDD. Again, thanks a lot for your answer, Greetings, Juan Rodriguez 2015-02-18 14:56 GMT+01:00 Paweł Szulc paul.sz...@gmail.com: Maybe you can omit using grouping all together with groupByKey? What is your next step after grouping elements by key? Are you trying to reduce values? If so then I would recommend using some reducing functions like for example reduceByKey or aggregateByKey. Those will first reduce value for each key locally on each node before doing actual IO over the network. There will also be no grouping phase so you will not run into memory issues. Please let me know if that helped Pawel Szulc @rabbitonweb http://www.rabbitonweb.com On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm writing a Spark program where I want to divide a RDD into different groups, but the groups are too big to use groupByKey. To cope with that, since I know in advance the list of keys for each group, I build a map from the keys to the RDDs that result from filtering the input RDD to get the records for the corresponding key. This works when I have a small number of keys, but for big number of keys (tens of thousands) the execution gets stuck, without issuing any new Spark stage. I suspect the reason is that the Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm rewriting the program to use a single RDD of pairs, with cached partions, but I wanted to be sure I understand the problem here. Thanks a lot in advance, Greetings, Juan Rodriguez
Spark data incorrect when more than 200 tasks
I'm fairly new to Spark. We have data in avro files on hdfs. We are trying to load up all the avro files (28 gigs worth right now) and do an aggregation. When we have less than 200 tasks the data all runs and produces the proper results. If there are more than 200 tasks (as stated in the logs by the TaskSetManager) the data seems to only group when it reads in the RDD from hdfs by the first record in the avro file. If I set: spark.shuffle.sort.bypassMergeThreshold greater than 200 data seems to work. I don't understand why or how? Here is the relevant code pieces: JavaSparkContext context = new JavaSparkContext( new SparkConf() .setAppName(AnalyticsJob.class.getSimpleName()) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) ); context.hadoopConfiguration().set(mapreduce.input.fileinputformat.input.dir.recursive, true); context.hadoopConfiguration().set(mapreduce.input.fileinputformat.inputdir, job.inputDirectory); JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - event.datum()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) .mapToPair(event - new Tuple2AnalyticsEvent, Integer(event, 1)) .groupByKey() .map(tuple - tuple._1()); events.persist(StorageLevel.MEMORY_AND_DISK_2()); If I do a collect on events at this point the data is not as expected and jumbled, so when we pass it onto the next job in our pipeline for aggregation, the data doesn't come out as expected. The downstream tasks maps to pairs again and stores in the db. Thanks in advance for this help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-data-incorrect-when-more-than-200-tasks-tp21710.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JdbcRDD, ClassCastException with scala.Function0
That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 is calling a static method JdbcRDD.create, not new JdbcRDD. Is that what you tried doing? On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody. Yes, I originally started off by looking at that but I get a compile error if I try and use that approach: constructor JdbcRDD in class JdbcRDDT cannot be applied to given types. Not to mention that JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last argument). Wonder if it's a JDK version issue, I'm using 1.7. So I've got this, which doesn't compile JdbcRDDRow jdbcRDD = new JdbcRDDRow( new SparkContext(conf), new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { Connection conn = null; try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); } catch (ClassNotFoundException ex) { throw new RuntimeException(Error while loading JDBC driver., ex); } return conn; } }, SELECT * FROM EMPLOYEES, 0L, 1000L, 10, new FunctionResultSet, Row() { public Row call(ResultSet r) throws Exception { return null; // have some actual logic here... } }, scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); The other approach was mimicing the DbConnection class from this post: http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of Function0Connection whose instance I supply as the 'getConnection' parameter into the JdbcRDD constructor. Compiles fine. The definition of the class/function is as follows: public class GetDbConnection extends AbstractFunction0Connection implements Serializable where scala.runtime.AbstractFunction0 extends scala.Function0. At runtime, I get an exception as below. Does anyone have an idea as to how to resolve this/work around it? Thanks. I'm running Spark 1.2.1 built for Hadoop 2.4. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): java.lang.ClassCastException: cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to field org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection of type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at
Re: JdbcRDD, ClassCastException with scala.Function0
I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 1, new FunctionResultSet, Integer() { public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); The other thing I've tried was to define a static class locally for GetConnection and use the JdbcCreate constructor. This got around the compile issues but blew up at runtime with NoClassDefFoundError: scala/runtime/AbstractFunction0 ! JdbcRDDRow jdbcRDD = new JdbcRDDRow( sc, (AbstractFunction0Connection) new DbConn(), // had to cast or a compile error SQL_QUERY, 0L, 1000L, 10, new MapRow(), ROW_CLASS_TAG); // DbConn is defined as public static class DbConn extends AbstractFunction0Connection implements Serializable On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 is calling a static method JdbcRDD.create, not new JdbcRDD. Is that what you tried doing? On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody. Yes, I originally started off by looking at that but I get a compile error if I try and use that approach: constructor JdbcRDD in class JdbcRDDT cannot be applied to given types. Not to mention that JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last argument). Wonder if it's a JDK version issue, I'm using 1.7. So I've got this, which doesn't compile JdbcRDDRow jdbcRDD = new JdbcRDDRow( new SparkContext(conf), new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { Connection conn = null; try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); } catch (ClassNotFoundException ex) { throw new RuntimeException(Error while loading JDBC driver., ex); } return conn; } }, SELECT * FROM EMPLOYEES, 0L, 1000L, 10, new FunctionResultSet, Row() { public Row call(ResultSet r) throws Exception { return null; // have some actual logic here... } }, scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); The other approach was mimicing the DbConnection class from this post: http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of Function0Connection whose instance I supply as the 'getConnection' parameter into the JdbcRDD constructor. Compiles fine. The definition of the class/function is as follows: public class GetDbConnection extends AbstractFunction0Connection implements Serializable where scala.runtime.AbstractFunction0 extends scala.Function0. At runtime, I get an exception as below. Does anyone have an idea as to how to resolve this/work around it? Thanks. I'm running Spark 1.2.1 built for Hadoop 2.4. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): java.lang.ClassCastException: cannot assign instance of com.kona.motivis.spark.proto.GetDbConnection to field org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection of type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
Spark and Spark Streaming code sharing best practice.
Hey, It seems pretty clear that one of the strength of Spark is to be able to share your code between your batch and streaming layer. Though, given that Spark streaming uses DStream being a set of RDDs and Spark uses a single RDD there might some complexity associated with it. Of course since DStream is a superset of RDDs, one can just run the same code at the RDD granularity using DStream::forEachRDD. While this should work for map, I am not sure how that can work when it comes to reduce phase given that a group of keys spans across multiple RDDs. One of the option is to change the dataset object on which a job works on. For example of passing an RDD to a class method, one passes a higher level object (MetaRDD) that wraps around RDD or DStream depending the context. At this point the job calls its regular maps, reduces and so on and the MetaRDD wrapper would delegate accordingly. Just would like to know the official best practice from the spark community though. Thanks,
Re: Spark and Spark Streaming code sharing best practice.
I find monoids pretty useful in this respect, basically separating out the logic in a monoid and then applying the logic to either a stream or a batch. A list of such practices could be really useful. On Thu, Feb 19, 2015 at 12:26 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, It seems pretty clear that one of the strength of Spark is to be able to share your code between your batch and streaming layer. Though, given that Spark streaming uses DStream being a set of RDDs and Spark uses a single RDD there might some complexity associated with it. Of course since DStream is a superset of RDDs, one can just run the same code at the RDD granularity using DStream::forEachRDD. While this should work for map, I am not sure how that can work when it comes to reduce phase given that a group of keys spans across multiple RDDs. One of the option is to change the dataset object on which a job works on. For example of passing an RDD to a class method, one passes a higher level object (MetaRDD) that wraps around RDD or DStream depending the context. At this point the job calls its regular maps, reduces and so on and the MetaRDD wrapper would delegate accordingly. Just would like to know the official best practice from the spark community though. Thanks, -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Hamburg Apache Spark Meetup
If you could also add the Hamburg Apache Spark Meetup, I'd appreciate it. http://www.meetup.com/Hamburg-Apache-Spark-Meetup/ On Tue, Feb 17, 2015 at 5:08 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Thanks! I've added you. Matei On Feb 17, 2015, at 4:06 PM, Ralph Bergmann | the4thFloor.eu ra...@the4thfloor.eu wrote: Hi, there is a small Spark Meetup group in Berlin, Germany :-) http://www.meetup.com/Berlin-Apache-Spark-Meetup/ Plaes add this group to the Meetups list at https://spark.apache.org/community.html Ralph - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: build spark for cdh5
Hi Koert, You should be using -Phadoop-2.3 instead of -Phadoop2.3. -Sandy On Wed, Feb 18, 2015 at 10:51 AM, Koert Kuipers ko...@tresata.com wrote: does anyone have the right maven invocation for cdh5 with yarn? i tried: $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn -DskipTests clean package $ mvn -Phadoop2.3 -Dhadoop.version=2.5.0-cdh5.2.3 -Pyarn test it builds and passes tests just fine, but when i deploy on cluster and i try to run SparkPi i get: Caused by: java.lang.VerifyError: class org.apache.hadoop.yarn.proto.YarnServiceProtos$GetApplicationReportRequestProto overrides final method getUnknownFields.()Lcom/google/p\ rotobuf/UnknownFieldSet; so clearly i am doing something wrong. something with protobuf 2.4 versus 2.5 i do not want to use the cloudera version of spark for cdh 5 (it includes the wrong akka version for me) thanks
ML Transformer
I am working right now with the ML pipeline, which I really like it. However in order to make a real use of it, I would like create my own transformers that implements org.apache.spark.ml.Transformer. In order to do that, a method from the PipelineStage needs to be implemented. But this method is private to the ml package: private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): StructType Do any user can create their own transformers? If not, do this functionality will be added in the future. Thanks -- Cesar Flores
NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr
I'm using Solrj in a Spark program. When I try to send the docs to Solr, I get the NotSerializableException on the DefaultHttpClient. Is there a possible fix or workaround? I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0. final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL); ... JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow, SolrInputDocument() { public SolrInputDocument call(Row r) { return r.toSolrDocument(); } }); solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument() { public void call(IteratorSolrInputDocument solrDocIterator) throws Exception { ListSolrInputDocument batch = new ArrayListSolrInputDocument(); while (solrDocIterator.hasNext()) { SolrInputDocument inputDoc = solrDocIterator.next(); batch.add(inputDoc); if (batch.size() = batchSize) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } if (!batch.isEmpty()) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } }); Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32) at com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158) at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.http.impl.client.DefaultHttpClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-org-apache-http-impl-client-DefaultHttpClient-when-trying-to-send-documentsr-tp21713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JsonRDD to parquet -- data loss
Concurrent inserts into the same table are not supported. I can try to make this clearer in the documentation. On Tue, Feb 17, 2015 at 8:01 PM, Vasu C vasuc.bigd...@gmail.com wrote: Hi, I am running spark batch processing job using spark-submit command. And below is my code snippet. Basically converting JsonRDD to parquet and storing it in HDFS location. The problem I am facing is if multiple jobs are are triggered parallely, even though job executes properly (as i can see in spark webUI), there is no parquet file created in hdfs path. If 5 jobs are executed parallely than only 3 parquet files are getting created. Is this the data loss scenario ? Or am I missing something here. Please help me in this Here tableName is unique with timestamp appended to it. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val jsonRdd = sqlContext.jsonRDD(results) val parquetTable = sqlContext.parquetFile(parquetFilePath) parquetTable.registerTempTable(tableName) jsonRdd.insertInto(tableName) Regards, Vasu C
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
I'm not sure what on the driver means but I've tried setting spark.files.userClassPathFirst to true, in $SPARK-HOME/conf/spark-defaults.conf and also in the SparkConf programmatically; it appears to be ignored. The solution was to follow Emre's recommendation and downgrade the selected Solrj distro to 4.0.0. That did the trick as it appears to be using the same HttpClient as one used by Spark/Hadoop. The Spark program I'm running is a jar I submit via a spark-submit invokation. On Wed, Feb 18, 2015 at 1:57 PM, Marcelo Vanzin van...@cloudera.com wrote: Hello, On Tue, Feb 17, 2015 at 8:53 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as Is the code in question running on the driver or in some executor? spark.files.userClassPathFirst only applies to executors. To override classes in the driver's classpath, you need to modify spark.driver.extraClassPath (or --driver-class-path in spark-submit's command line). In 1.3 there's an option similar to spark.files.userClassPathFirst that works for the driver too. -- Marcelo
Re: JdbcRDD, ClassCastException with scala.Function0
Cody, you were right, I had a copy and paste snag where I ended up with a vanilla SparkContext rather than a Java one. I also had to *not* use my function subclasses, rather just use anonymous inner classes for the Function stuff and that got things working. I'm fully following the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim. Is there a clean way to refactor out the custom Function classes such as the one for getting a db connection or mapping ResultSet data to your own POJO's rather than doing it all inline? On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote: Is sc there a SparkContext or a JavaSparkContext? The compilation error seems to indicate the former, but JdbcRDD.create expects the latter On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 1, new FunctionResultSet, Integer() { public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); The other thing I've tried was to define a static class locally for GetConnection and use the JdbcCreate constructor. This got around the compile issues but blew up at runtime with NoClassDefFoundError: scala/runtime/AbstractFunction0 ! JdbcRDDRow jdbcRDD = new JdbcRDDRow( sc, (AbstractFunction0Connection) new DbConn(), // had to cast or a compile error SQL_QUERY, 0L, 1000L, 10, new MapRow(), ROW_CLASS_TAG); // DbConn is defined as public static class DbConn extends AbstractFunction0Connection implements Serializable On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 is calling a static method JdbcRDD.create, not new JdbcRDD. Is that what you tried doing? On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody. Yes, I originally started off by looking at that but I get a compile error if I try and use that approach: constructor JdbcRDD in class JdbcRDDT cannot be applied to given types. Not to mention that JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last argument). Wonder if it's a JDK version issue, I'm using 1.7. So I've got this, which doesn't compile JdbcRDDRow jdbcRDD = new JdbcRDDRow( new SparkContext(conf), new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { Connection conn = null; try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); } catch (ClassNotFoundException ex) { throw new RuntimeException(Error while loading JDBC driver., ex); } return conn; } }, SELECT * FROM EMPLOYEES, 0L, 1000L, 10, new FunctionResultSet, Row() { public Row call(ResultSet r) throws Exception { return null; // have some actual logic here... } }, scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); The other approach was mimicing the DbConnection class from this post: http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of Function0Connection whose instance I supply as the 'getConnection' parameter into the JdbcRDD constructor. Compiles fine. The definition of the class/function is as follows: public class GetDbConnection extends AbstractFunction0Connection implements Serializable where scala.runtime.AbstractFunction0 extends scala.Function0. At runtime, I get an exception as below. Does anyone have an idea as to how to resolve this/work around it? Thanks. I'm running Spark 1.2.1 built for Hadoop 2.4. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0
Spark can't pickle class: error cannot lookup attribute
Hi, This is a duplicate of the stack-overflow question here http://stackoverflow.com/questions/28569374/spark-returning-pickle-error-cannot-lookup-attribute. I hope to generate more interest on this mailing list. *The problem:* I am running into some attribute lookup problems when trying to initiate a class within my RDD. My workflow is quite standard: 1- Start with an RDD 2- Take each element of the RDD, initiate an object for each 3- Reduce (I will write a method that will define the reduce operation later on) *Here is #2:* *class test(object):* *def __init__(self, a,b):* *self.total = a + b* *a = sc.parallelize([(True,False),(False,False)])* *a.map(lambda (x,y): test(x,y))* Here is the error I get: PicklingError: Can't pickle class 'main.test' : attribute lookup main.test failed I'd like to know if there is any way around it. Please, answer with a working example to achieve the intended results (i.e. creating a RDD of objects of class tests). Thanks in advance! *Related question:* - https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn GG
Thriftserver Beeline
Hi , I created some hive tables and trying to list them through Beeline , but not getting any results. I can list the tables through spark-sql. When I connect beeline, it starts up with following message : Connecting to jdbc:hive2://tst001:10001 Enter username for jdbc:hive2://tst001:10001: Enter password for jdbc:hive2://tst001:10001: Connected to: Spark SQL (version 1.2.0) Driver: null (version null) Transaction isolation: TRANSACTION_REPEATABLE_READ show tables on beeline, return no results. Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Thriftserver-Beeline-tp21712.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JdbcRDD, ClassCastException with scala.Function0
Cant you implement the org.apache.spark.api.java.function.Function interface and pass an instance of that to JdbcRDD.create ? On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Cody, you were right, I had a copy and paste snag where I ended up with a vanilla SparkContext rather than a Java one. I also had to *not* use my function subclasses, rather just use anonymous inner classes for the Function stuff and that got things working. I'm fully following the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim. Is there a clean way to refactor out the custom Function classes such as the one for getting a db connection or mapping ResultSet data to your own POJO's rather than doing it all inline? On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote: Is sc there a SparkContext or a JavaSparkContext? The compilation error seems to indicate the former, but JdbcRDD.create expects the latter On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 1, new FunctionResultSet, Integer() { public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); The other thing I've tried was to define a static class locally for GetConnection and use the JdbcCreate constructor. This got around the compile issues but blew up at runtime with NoClassDefFoundError: scala/runtime/AbstractFunction0 ! JdbcRDDRow jdbcRDD = new JdbcRDDRow( sc, (AbstractFunction0Connection) new DbConn(), // had to cast or a compile error SQL_QUERY, 0L, 1000L, 10, new MapRow(), ROW_CLASS_TAG); // DbConn is defined as public static class DbConn extends AbstractFunction0Connection implements Serializable On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 is calling a static method JdbcRDD.create, not new JdbcRDD. Is that what you tried doing? On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody. Yes, I originally started off by looking at that but I get a compile error if I try and use that approach: constructor JdbcRDD in class JdbcRDDT cannot be applied to given types. Not to mention that JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last argument). Wonder if it's a JDK version issue, I'm using 1.7. So I've got this, which doesn't compile JdbcRDDRow jdbcRDD = new JdbcRDDRow( new SparkContext(conf), new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { Connection conn = null; try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); } catch (ClassNotFoundException ex) { throw new RuntimeException(Error while loading JDBC driver., ex); } return conn; } }, SELECT * FROM EMPLOYEES, 0L, 1000L, 10, new FunctionResultSet, Row() { public Row call(ResultSet r) throws Exception { return null; // have some actual logic here... } }, scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); The other approach was mimicing the DbConnection class from this post: http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm reading data from a database using JdbcRDD, in Java, and I have an implementation of Function0Connection whose instance I supply as the 'getConnection' parameter into the JdbcRDD constructor. Compiles fine. The definition of the class/function is as follows: public class GetDbConnection extends AbstractFunction0Connection implements Serializable where scala.runtime.AbstractFunction0 extends scala.Function0. At runtime, I get an exception as below. Does anyone have
spark slave cannot execute without admin permission on windows
Hi, Is it possible to configure spark to run without admin permission on windows? My current setup run master slave successfully with admin permission. However, if I downgrade permission level from admin to user, SparkPi fails with the following exception on the slave node: Exception in thread main org.apache.spark.SparkException: Job aborted due to s tage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 9, workernode0.jnashsparkcurr2.d10.internal.cloudapp.net) : java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) Upon investigation, it appears that sparkPi jar under spark_home\worker\appname\*.jar does not have execute permission set, causing spark not able to find class. Advice would be very much appreciated. Thanks, Judy
Re: Spark and Spark Streaming code sharing best practice.
Monoids are useful in Aggregations and try avoiding Anonymous functions, creating out functions out of the spark code allows the functions to be reused(Possibly between Spark and Spark Streaming) On Thu, Feb 19, 2015 at 6:56 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Thanks Arush. I will check that out. On Wed, Feb 18, 2015 at 11:06 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: I find monoids pretty useful in this respect, basically separating out the logic in a monoid and then applying the logic to either a stream or a batch. A list of such practices could be really useful. On Thu, Feb 19, 2015 at 12:26 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, It seems pretty clear that one of the strength of Spark is to be able to share your code between your batch and streaming layer. Though, given that Spark streaming uses DStream being a set of RDDs and Spark uses a single RDD there might some complexity associated with it. Of course since DStream is a superset of RDDs, one can just run the same code at the RDD granularity using DStream::forEachRDD. While this should work for map, I am not sure how that can work when it comes to reduce phase given that a group of keys spans across multiple RDDs. One of the option is to change the dataset object on which a job works on. For example of passing an RDD to a class method, one passes a higher level object (MetaRDD) that wraps around RDD or DStream depending the context. At this point the job calls its regular maps, reduces and so on and the MetaRDD wrapper would delegate accordingly. Just would like to know the official best practice from the spark community though. Thanks, -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: spark slave cannot execute without admin permission on windows
You need not require admin permission, but just make sure all those jars has execute permission ( read/write access) Thanks Best Regards On Thu, Feb 19, 2015 at 11:30 AM, Judy Nash judyn...@exchange.microsoft.com wrote: Hi, Is it possible to configure spark to run without admin permission on windows? My current setup run master slave successfully with admin permission. However, if I downgrade permission level from admin to user, SparkPi fails with the following exception on the slave node: Exception in thread main org.apache.spark.SparkException: Job aborted due to s tage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 9, workernode0.jnashsparkcurr2.d10.internal.cloudapp.net) : java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) Upon investigation, it appears that sparkPi jar under spark_home\worker\appname\*.jar does not have execute permission set, causing spark not able to find class. Advice would be very much appreciated. Thanks, Judy
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Thanks Imran, I'll try your suggestions. I eventually got this to run by 'checkpointing' the joined RDD (according to Akhil's suggestion) before performing the reduceBy, and then checkpointing it again afterward. i.e. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .persist(MEMORY_AND_DISK_SER) val rdd3 = rdd2.reduceByKey((x,y)=x+y).persist(MEMORY_AND_DISK_SER) rdd3.count() It takes a while, but at least it runs. So, I'll be sure to try your suggestions for further speed-up. Thanks again for your help. On 18 February 2015 at 18:47, Imran Rashid iras...@cloudera.com wrote: Hi Tom, there are a couple of things you can do here to make this more efficient. first, I think you can replace your self-join with a groupByKey. on your example data set, this would give you (1, Iterable(2,3)) (4, Iterable(3)) this reduces the amount of data that needs to be shuffled, and that way you can produce all of your pairs just from the Iterable(2,3). second, if you expect the same pairs to appear many times in your dataset, you might first want to replace them with a count. eg., if you start with (1,2) (1,2) (1,2) ... (1,2) (1,3) (1,3) (4,3) ... you might want to first convert that to get a count of each pair val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _} to give you something like: ((1,2), 145) ((1,3), 2) ((4,3), 982) ... and then with a little more massaging you can group by key and also keep the counts of each item: val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] = pairCounts.map{case((key, value), counts) = key - (value,counts) }.groupByKey which would give you something like (1, Iterable((2,145), (3, 2)) (4, Iterable((3, 982)) hope this helps Imran On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote: Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
Re: Is spark streaming +MlLib for online learning?
This feature request is already being tracked: https://issues.apache.org/jira/browse/SPARK-4981 Aiming for 1.4 Best, Reza On Wed, Feb 18, 2015 at 2:40 AM, mucaho muc...@yahoo.com wrote: Hi What is the general consensus/roadmap for implementing additional online / streamed trainable models? Apache Spark 1.2.1 currently supports streaming linear regression clustering, although other streaming linear methods are planned according to the issue tracker. However, I can not find any details on the issue tracker about online training of a collaborative filter. Judging from another mailing list discussion http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ce07aa61e-eeb9-4ded-be3e-3f04003e4...@storefront.be%3E incremental training should be possible for ALS. Any plans for the future? Regards mucaho -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-spark-streaming-MlLib-for-online-learning-tp19701p21698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to pass parameters to a spark-jobserver Scala class?
Thank you very much Vasu. Let me add some more points to my question. We are developing a Java program for connection spark-jobserver to Vaadin (Java framework). Following is the sample code I wrote for connecting both (the code works fine): / URL url = null; HttpURLConnection connection = null; String strQueryUri = http://localhost:8090/jobs?appName=sparkingclassPath=sparking.jobserver.GetOrCreateUserscontext=user-context;; url = new URL(strQueryUri); connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod(POST); connection.setRequestProperty(Accept, application/json); InputStream isQueryJSON = connection.getInputStream(); LinkedHashMapString, Object queryMap = (LinkedHashMapString, Object) getJSONKeyValue(isQueryJSON, null, result); String strJobId = (String) queryMap.get(jobId);/ Can you suggest how to modify above code for passing parameters (as we do in *curl -d ...*) during job run? Hope I make sense. Sasi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21717.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to connect a mobile app (Android/iOS) with a Spark backend?
Hi, I have dependency problems to use spark-core inside of a HttpServlet (see other mail from me). Maybe I'm wrong?! What I want to do: I develop a mobile app (Android and iOS) and want to connect them with Spark on backend side. To do this I want to use Tomcat. The app uses https to ask Tomcat for the needed data and Tomcat asks Spark. Is this the right way? Or is there a better way to connect my mobile apps with the Spark backend? I hope that I'm not the first one who want to do this. Ralph signature.asc Description: OpenPGP digital signature
Periodic Broadcast in Apache Spark Streaming
I am implementing a stream learner for text classification. There are some single-valued parameters in my implementation that needs to be updated as new stream items arrive. For example, I want to change learning rate as the new predictions are made. However, I doubt that there is a way to broadcast variables after the initial broadcast. So what happens if I need to broadcast a variable every time I update it. If there is a way to do it or a workaround for what I want to accomplish in Spark Streaming, I'd be happy to hear about it. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Periodic-Broadcast-in-Apache-Spark-Streaming-tp21703.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a limit to the number of RDDs in a Spark context?
At some level, enough RDDs creates hundreds of thousands of tiny partitions of data each of which creates a task for each stage. The raw overhead of all the message passing can slow things down a lot. I would not design something to use an RDD per key. You would generally use key by some value you want to divide and filter on, and then use a *ByKey to do your work. You don't work with individual RDDs this way, but usually that's good news. You usually have a lot more flexibility operating just in pure Java / Scala to do whatever you need inside your function. On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi Paweł, Thanks a lot for your answer. I finally got the program to work by using aggregateByKey, but I was wondering why creating thousands of RDDs doesn't work. I think that could be interesting for using methods that work on RDDs like for example JavaDoubleRDD.stats() ( http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29). If the groups are small then I can chain groupBy(), collect(), parallelize() and stats(), but that is quite inefficient because it implies moving data to and from the driver, and also doesn't scale to big groups; on the other hand if I use aggregateByKey or a similar function then I cannot use stats() so I have to reimplement it. In general I was looking for a way to reuse other functions that I have that work on RDDs, for using them on groups of data in a RDD, because I don't see a how to directly apply them to each of the groups in a pair RDD. Again, thanks a lot for your answer, Greetings, Juan Rodriguez 2015-02-18 14:56 GMT+01:00 Paweł Szulc paul.sz...@gmail.com: Maybe you can omit using grouping all together with groupByKey? What is your next step after grouping elements by key? Are you trying to reduce values? If so then I would recommend using some reducing functions like for example reduceByKey or aggregateByKey. Those will first reduce value for each key locally on each node before doing actual IO over the network. There will also be no grouping phase so you will not run into memory issues. Please let me know if that helped Pawel Szulc @rabbitonweb http://www.rabbitonweb.com On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm writing a Spark program where I want to divide a RDD into different groups, but the groups are too big to use groupByKey. To cope with that, since I know in advance the list of keys for each group, I build a map from the keys to the RDDs that result from filtering the input RDD to get the records for the corresponding key. This works when I have a small number of keys, but for big number of keys (tens of thousands) the execution gets stuck, without issuing any new Spark stage. I suspect the reason is that the Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm rewriting the program to use a single RDD of pairs, with cached partions, but I wanted to be sure I understand the problem here. Thanks a lot in advance, Greetings, Juan Rodriguez - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
Are you proposing I downgrade Solrj's httpclient dependency to be on par with that of Spark/Hadoop? Or upgrade Spark/Hadoop's httpclient to the latest? Solrj has to stay with its selected version. I could try and rebuild Spark with the latest httpclient but I've no idea what effects that may cause on Spark. Sent from my iPhone On Feb 18, 2015, at 1:37 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Did you try to make maven pick the latest version http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management That way solrj won't cause any issue, you can try this and check if the part of your code where you access HDFS works fine? On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: How to integrate hive on spark
Hi Did you try these steps. https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started Thanks Arush On Wed, Feb 18, 2015 at 7:20 PM, sandeepvura sandeepv...@gmail.com wrote: Hi , I am new to sparks.I had installed spark on 3 node cluster.I would like to integrate hive on spark . can anyone please help me on this, Regards, Sandeep.v -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-integrate-hive-on-spark-tp21702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Creating RDDs from within foreachPartition() [Spark-Streaming]
You can't use RDDs inside RDDs. RDDs are managed from the driver, and functions like foreachRDD execute things on the remote executors. You can write code to simply directly save whatever you want to ES. There is not necessarily a need to use RDDs for that. On Wed, Feb 18, 2015 at 11:36 AM, t1ny wbr...@gmail.com wrote: Hi all, I am trying to create RDDs from within /rdd.foreachPartition()/ so I can save these RDDs to ElasticSearch on the fly : stream.foreachRDD(rdd = { rdd.foreachPartition { iterator = { val sc = rdd.context iterator.foreach { case (cid, sid, ts) = { [...] sc.makeRDD(...).saveToEs(...) - *throws a NullPointerException (sc is null)* } } } } } Unfortunately this doesn't work as I can't seem to be able to access the SparkContext from anywhere within /foreachPartition()/. The code above throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where ssc is the StreamingContext object created in the main function, outside of /foreachPartition/) then I get a NotSerializableException. What is the correct way to do this ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to connect a mobile app (Android/iOS) with a Spark backend?
I am running Spark Jobs behind tomcat. We didn't face any issues.But for us the user base is very small. The possible blockers could be 1. If there are many users of the system. Then jobs might have to w8, you might want to think about the kind of scheduling you want to do. 2.Again if the no of users is a bit high, Tomcat doesn't scale really well.(Not sure how much of a blocker it is). Thanks Arush On Wed, Feb 18, 2015 at 6:41 PM, Ralph Bergmann | the4thFloor.eu ra...@the4thfloor.eu wrote: Hi, I have dependency problems to use spark-core inside of a HttpServlet (see other mail from me). Maybe I'm wrong?! What I want to do: I develop a mobile app (Android and iOS) and want to connect them with Spark on backend side. To do this I want to use Tomcat. The app uses https to ask Tomcat for the needed data and Tomcat asks Spark. Is this the right way? Or is there a better way to connect my mobile apps with the Spark backend? I hope that I'm not the first one who want to do this. Ralph -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Is there a limit to the number of RDDs in a Spark context?
Hi Paweł, Thanks a lot for your answer. I finally got the program to work by using aggregateByKey, but I was wondering why creating thousands of RDDs doesn't work. I think that could be interesting for using methods that work on RDDs like for example JavaDoubleRDD.stats() ( http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29). If the groups are small then I can chain groupBy(), collect(), parallelize() and stats(), but that is quite inefficient because it implies moving data to and from the driver, and also doesn't scale to big groups; on the other hand if I use aggregateByKey or a similar function then I cannot use stats() so I have to reimplement it. In general I was looking for a way to reuse other functions that I have that work on RDDs, for using them on groups of data in a RDD, because I don't see a how to directly apply them to each of the groups in a pair RDD. Again, thanks a lot for your answer, Greetings, Juan Rodriguez 2015-02-18 14:56 GMT+01:00 Paweł Szulc paul.sz...@gmail.com: Maybe you can omit using grouping all together with groupByKey? What is your next step after grouping elements by key? Are you trying to reduce values? If so then I would recommend using some reducing functions like for example reduceByKey or aggregateByKey. Those will first reduce value for each key locally on each node before doing actual IO over the network. There will also be no grouping phase so you will not run into memory issues. Please let me know if that helped Pawel Szulc @rabbitonweb http://www.rabbitonweb.com On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm writing a Spark program where I want to divide a RDD into different groups, but the groups are too big to use groupByKey. To cope with that, since I know in advance the list of keys for each group, I build a map from the keys to the RDDs that result from filtering the input RDD to get the records for the corresponding key. This works when I have a small number of keys, but for big number of keys (tens of thousands) the execution gets stuck, without issuing any new Spark stage. I suspect the reason is that the Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm rewriting the program to use a single RDD of pairs, with cached partions, but I wanted to be sure I understand the problem here. Thanks a lot in advance, Greetings, Juan Rodriguez
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
Hello Dmitry, I had almost the same problem and solved it by using version 4.0.0 of SolrJ: dependency groupIdorg.apache.solr/groupId artifactIdsolr-solrj/artifactId version4.0.0/version /dependency In my case, I was lucky that version 4.0.0 of SolrJ had all the functionality I needed. -- Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:39 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I think I'm going to have to rebuild Spark with commons.httpclient.version set to 4.3.1 which looks to be the version chosen by Solrj, rather than the 4.2.6 that Spark's pom mentions. Might work. On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Did you try to make maven pick the latest version http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management That way solrj won't cause any issue, you can try this and check if the part of your code where you access HDFS works fine? On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Emre Sevinc
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would that not collide with Spark/Hadoop's default dependency on HttpClient set to 4.2.6? If that's the case that might just solve the problem. Would Solrj 4.0.0 work with the latest Solr, 4.10.3? On Wed, Feb 18, 2015 at 10:50 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Dmitry, I had almost the same problem and solved it by using version 4.0.0 of SolrJ: dependency groupIdorg.apache.solr/groupId artifactIdsolr-solrj/artifactId version4.0.0/version /dependency In my case, I was lucky that version 4.0.0 of SolrJ had all the functionality I needed. -- Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:39 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I think I'm going to have to rebuild Spark with commons.httpclient.version set to 4.3.1 which looks to be the version chosen by Solrj, rather than the 4.2.6 that Spark's pom mentions. Might work. On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Did you try to make maven pick the latest version http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management That way solrj won't cause any issue, you can try this and check if the part of your code where you access HDFS works fine? On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Emre Sevinc
Re: Why cached RDD is recomputed again?
Thanks Sean, but I don't think that fitting into memory is the case, because: 1- I can see in the UI that 100% of RDD is cached, (moreover the RDD is quite small, 100 MB, while worker has 1.5 GB) 2- I also tried MEMORY_AND_DISK, but absolutely no difference ! Probably I have messed up somewhere else! Do you have any other idea where I should look for the cause? best, /Shahab On Wed, Feb 18, 2015 at 4:22 PM, Sean Owen so...@cloudera.com wrote: The mostly likely explanation is that you wanted to put all the partitions in memory and they don't all fit. Unless you asked to persist to memory or disk, some partitions will simply not be cached. Consider using MEMORY_OR_DISK persistence. This can also happen if blocks were lost due to node failure. On Wed, Feb 18, 2015 at 3:19 PM, shahab shahab.mok...@gmail.com wrote: Hi, I have a cached RDD (I can see in UI that it is cached), but when I use this RDD , I can see that the RDD is partially recomputed (computed) again. It is partially because I can see in UI that some task are skipped (have a look at the attached figure). Now the question is 1: what causes a cached RDD to be recomputed again? and why somes tasks are skipped and some not?? best, /Shahab - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[no subject]
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
On Wed, Feb 18, 2015 at 4:54 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would that not collide with Spark/Hadoop's default dependency on HttpClient set to 4.2.6? If that's the case that might just solve the problem. Would Solrj 4.0.0 work with the latest Solr, 4.10.3? In my case, it worked; I mean I was trying to send some documents to the latest version of Solr server (v4.10.3), and using v4.0.0 of SolrJ worked without any problems so far. I couldn't find any other way to deal with this old httpclient dependency problem in Spark. -- Emre Sevinç http://www.bigindustries.be/
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
Thanks, Emre! Will definitely try this. On Wed, Feb 18, 2015 at 11:00 AM, Emre Sevinc emre.sev...@gmail.com wrote: On Wed, Feb 18, 2015 at 4:54 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would that not collide with Spark/Hadoop's default dependency on HttpClient set to 4.2.6? If that's the case that might just solve the problem. Would Solrj 4.0.0 work with the latest Solr, 4.10.3? In my case, it worked; I mean I was trying to send some documents to the latest version of Solr server (v4.10.3), and using v4.0.0 of SolrJ worked without any problems so far. I couldn't find any other way to deal with this old httpclient dependency problem in Spark. -- Emre Sevinç http://www.bigindustries.be/
Re: How to connect a mobile app (Android/iOS) with a Spark backend?
This does not sound like a Spark problem -- doesn't even necessarily sound like a distributed problem. Are you of a scale where building simple logic in a web tier that queries a NoSQL / SQL database doesn't work? If you are at such a scale, then it sounds like you're describing a very high volume of small, very low latency queries. Spark is not designed for that. However it could do some crunching in the background and feed a serving layer technology like a NoSQL store with recent results, for example. On Wed, Feb 18, 2015 at 3:23 PM, Ralph Bergmann | the4thFloor.eu ra...@the4thfloor.eu wrote: Hi, Am 18.02.15 um 15:58 schrieb Sean Owen: That said, it depends a lot on what you are trying to do. What are you trying to do? You just say you're connecting to spark. There are 2 tasks I want to solve with Spark. 1) The user opens the mobile app. The app sends a pink to the backend. When this happens the backend has to collect some data from other server via http and has to do some stuff with this data. 2) The mobile app can download this data from 1. In this case the backend has to find/create the right data (depending on user location, rating, etc.) Ralph - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Learning GraphX Questions
Thanks for all the responses so far! I have started to understand the system more, but I just had another question while I was going along. Is there a way to check the individual partitions of an RDD? For example, if I had a graph with vertices a,b,c,d and it was split into 2 partitions could I check which vertices belonged in partition 1 and parition 2? Thank You, Matthew Bucci On Fri, Feb 13, 2015 at 10:58 PM, Ankur Dave ankurd...@gmail.com wrote: At 2015-02-13 12:19:46 -0800, Matthew Bucci mrbucci...@gmail.com wrote: 1) How do you actually run programs in GraphX? At the moment I've been doing everything live through the shell, but I'd obviously like to be able to work on it by writing and running scripts. You can create your own projects that build against Spark and GraphX through a Maven dependency [1], then run those applications using the bin/spark-submit script included with Spark [2]. These guides assume you already know how to do this using your preferred build tool (SBT or Maven). In short, here's how to do it with SBT: 1. Install SBT locally (`brew install sbt` on OS X). 2. Inside your project directory, create a build.sbt file listing Spark and GraphX as a dependency, as in [3]. 3. Run `sbt package` in a shell. 4. Pass the JAR in your_project_dir/target/scala-2.10/ to bin/spark-submit. [1] http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spark [2] http://spark.apache.org/docs/latest/submitting-applications.html [3] https://gist.github.com/ankurdave/1fb7234d8affb3a2e4f4 2) Is there a way to check the status of the partitions of a graph? For example, I want to determine for starters if the number of partitions requested are always made, like if I ask for 8 partitions but only have 4 cores what happens? You can look at `graph.vertices` and `graph.edges`, which are both RDDs, so you can do for example: graph.vertices.partitions 3) Would I be able to partition by vertex instead of edges, even if I had to write it myself? I know partitioning by edges is favored in a majority of the cases, but for the sake of research I'd like to be able to do both. If you pass PartitionStrategy.EdgePartition1D, this will partition edges by their source vertices, so all edges with the same source will be co-partitioned, and the communication pattern will be similar to vertex-partitioned (edge-cut) systems like Giraph. 4) Is there a better way to time processes outside of using built-in unix timing through the logs or something? I think the options are Unix timing, log file timestamp parsing, looking at the web UI, or writing timing code within your program (System.currentTimeMillis and System.nanoTime). Ankur
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
I think I'm going to have to rebuild Spark with commons.httpclient.version set to 4.3.1 which looks to be the version chosen by Solrj, rather than the 4.2.6 that Spark's pom mentions. Might work. On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Did you try to make maven pick the latest version http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management That way solrj won't cause any issue, you can try this and check if the part of your code where you access HDFS works fine? On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re:Is Databricks log analysis reference app only based on Java API
sorry for the noise. I have found it.. At 2015-02-18 23:34:40, Todd bit1...@163.com wrote: Looks the log anylysis reference app provided by Databricks at https://github.com/databricks/reference-apps only has java API? I'd like to see the Scala version one.
Is Databricks log analysis reference app only based on Java API
Looks the log anylysis reference app provided by Databricks at https://github.com/databricks/reference-apps only has java API? I'd like to see the Scala version one.
How to integrate hive on spark
Hi , I am new to sparks.I had installed spark on 3 node cluster.I would like to integrate hive on spark . can anyone please help me on this, Regards, Sandeep.v -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-integrate-hive-on-spark-tp21702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a limit to the number of RDDs in a Spark context?
Maybe you can omit using grouping all together with groupByKey? What is your next step after grouping elements by key? Are you trying to reduce values? If so then I would recommend using some reducing functions like for example reduceByKey or aggregateByKey. Those will first reduce value for each key locally on each node before doing actual IO over the network. There will also be no grouping phase so you will not run into memory issues. Please let me know if that helped Pawel Szulc @rabbitonweb http://www.rabbitonweb.com On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm writing a Spark program where I want to divide a RDD into different groups, but the groups are too big to use groupByKey. To cope with that, since I know in advance the list of keys for each group, I build a map from the keys to the RDDs that result from filtering the input RDD to get the records for the corresponding key. This works when I have a small number of keys, but for big number of keys (tens of thousands) the execution gets stuck, without issuing any new Spark stage. I suspect the reason is that the Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm rewriting the program to use a single RDD of pairs, with cached partions, but I wanted to be sure I understand the problem here. Thanks a lot in advance, Greetings, Juan Rodriguez
Re: How to connect a mobile app (Android/iOS) with a Spark backend?
Although you can do lots of things, I don't think Spark is something you should think of as a synchronous, real-time query API. So, somehow trying to use it directly from a REST API is probably not the best architecture. That said, it depends a lot on what you are trying to do. What are you trying to do? You just say you're connecting to spark. On Wed, Feb 18, 2015 at 1:11 PM, Ralph Bergmann | the4thFloor.eu ra...@the4thfloor.eu wrote: Hi, I have dependency problems to use spark-core inside of a HttpServlet (see other mail from me). Maybe I'm wrong?! What I want to do: I develop a mobile app (Android and iOS) and want to connect them with Spark on backend side. To do this I want to use Tomcat. The app uses https to ask Tomcat for the needed data and Tomcat asks Spark. Is this the right way? Or is there a better way to connect my mobile apps with the Spark backend? I hope that I'm not the first one who want to do this. Ralph - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
so if you only change this line: https://gist.github.com/emres/0fb6de128baea099e741#file-mymoduledriver-java-L137 to json.print() it processes 16 files instead? I am totally perplexed. My only suggestions to help debug are (1) see what happens when you get rid of MyModuleWorker completely -- change MyModuleDriver#process to just inStream.print() and see what happens (2) stick a bunch of printlns into MyModuleWorker#call (3) turn on DEBUG logging for org.apache.spark.streaming.dstream.FileInputDStream my gut instinct is that something else is flaky about the file input stream (eg., it makes some assumption about the file system which maybe aren't valid in your case, it has a bunch of caveats), and that it has just happened to work sometimes with your foreachRdd and failed sometimes with print. Sorry I am not a lot of help in this case, hope this leads you down the right track or somebody else can help out. Imran On Wed, Feb 18, 2015 at 2:28 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Imran, (a) I know that all 20 files are processed when I use foreachRDD, because I can see the processed files in the output directory. (My application logic writes them to an output directory after they are processed, *but* that writing operation does not happen in foreachRDD, below you can see the URL that includes my code and clarifies this). (b) I know only 16 files are processed because in the output directory I see only 16 files processed. I wait for minutes and minutes and no more files appear in the output directory. When I see only 16 files are processed and Spark Streaming went to the mode of idly watching the input directory, and then if I copy a few more files, they are also processed. (c) Sure, you can see part of my code in the following gist: https://gist.github.com/emres/0fb6de128baea099e741 It might seem a little convoluted at first, because my application is divided into two classes, a Driver class (setting up things and initializing them), and a Worker class (that implements the core functionality). I've also put the relevant methods from the my utility classes for completeness. I am as perplexed as you are as to why forcing the output via foreachRDD ended up in different behaviour compared to simply using print() method. Kind regards, Emre On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com wrote: Hi Emre, there shouldn't be any difference in which files get processed w/ print() vs. foreachRDD(). In fact, if you look at the definition of print(), it is just calling foreachRDD() underneath. So there is something else going on here. We need a little more information to figure out exactly what is going on. (I think Sean was getting at the same thing ...) (a) how do you know that when you use foreachRDD, all 20 files get processed? (b) How do you know that only 16 files get processed when you print()? Do you know the other files are being skipped, or maybe they are just stuck somewhere? eg., suppose you start w/ 20 files, and you see 16 get processed ... what happens after you add a few more files to the directory? Are they processed immediately, or are they never processed either? (c) Can you share any more code of what you are doing to the dstreams *before* the print() / foreachRDD()? That might give us more details about what the difference is. I can't see how .count.println() would be different than just println(), but maybe I am missing something also. Imran On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it
Re: Why cached RDD is recomputed again?
The mostly likely explanation is that you wanted to put all the partitions in memory and they don't all fit. Unless you asked to persist to memory or disk, some partitions will simply not be cached. Consider using MEMORY_OR_DISK persistence. This can also happen if blocks were lost due to node failure. On Wed, Feb 18, 2015 at 3:19 PM, shahab shahab.mok...@gmail.com wrote: Hi, I have a cached RDD (I can see in UI that it is cached), but when I use this RDD , I can see that the RDD is partially recomputed (computed) again. It is partially because I can see in UI that some task are skipped (have a look at the attached figure). Now the question is 1: what causes a cached RDD to be recomputed again? and why somes tasks are skipped and some not?? best, /Shahab - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to connect a mobile app (Android/iOS) with a Spark backend?
Hi, Am 18.02.15 um 15:58 schrieb Sean Owen: That said, it depends a lot on what you are trying to do. What are you trying to do? You just say you're connecting to spark. There are 2 tasks I want to solve with Spark. 1) The user opens the mobile app. The app sends a pink to the backend. When this happens the backend has to collect some data from other server via http and has to do some stuff with this data. 2) The mobile app can download this data from 1. In this case the backend has to find/create the right data (depending on user location, rating, etc.) Ralph signature.asc Description: OpenPGP digital signature
Re: WARN from Similarity Calculation
I am still debugging it but I believe if m% of users have unusually large columns and the RDD partitioner on RowMatrix is hashPartitioner then due to the basic algorithm without sampling, some partitions can cause unusually large number of keys... If my debug shows that I will add a custom partitioner for RowMatrix (will be useful for sparse vectors, for dense vector it does not matter)... Of course from feature engineering, we will see if we can cut off the users with large number of columns... On Tue, Feb 17, 2015 at 1:58 PM, Xiangrui Meng men...@gmail.com wrote: It may be caused by GC pause. Did you check the GC time in the Spark UI? -Xiangrui On Sun, Feb 15, 2015 at 8:10 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am sometimes getting WARN from running Similarity calculation: 15/02/15 23:07:55 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, abc.com, 48419, 0) with no recent heart beats: 66435ms exceeds 45000ms Do I need to increase the default 45 s to larger values for cases where we are doing blocked operation or long compute in the mapPartitions ? Thanks. Deb
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Hi Tom, there are a couple of things you can do here to make this more efficient. first, I think you can replace your self-join with a groupByKey. on your example data set, this would give you (1, Iterable(2,3)) (4, Iterable(3)) this reduces the amount of data that needs to be shuffled, and that way you can produce all of your pairs just from the Iterable(2,3). second, if you expect the same pairs to appear many times in your dataset, you might first want to replace them with a count. eg., if you start with (1,2) (1,2) (1,2) ... (1,2) (1,3) (1,3) (4,3) ... you might want to first convert that to get a count of each pair val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _} to give you something like: ((1,2), 145) ((1,3), 2) ((4,3), 982) ... and then with a little more massaging you can group by key and also keep the counts of each item: val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] = pairCounts.map{case((key, value), counts) = key - (value,counts) }.groupByKey which would give you something like (1, Iterable((2,145), (3, 2)) (4, Iterable((3, 982)) hope this helps Imran On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote: Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
Re: Spark can't pickle class: error cannot lookup attribute
Currently, PySpark can not support pickle a class object in current script ( '__main__'), the workaround could be put the implementation of the class into a separate module, then use bin/spark-submit --py-files xxx.py in deploy it. in xxx.py: class test(object): def __init__(self, a, b): self.total = a + b in job.py: from xxx import test a = sc.parallelize([(True,False),(False,False)]) a.map(lambda (x,y): test(x,y)) run it by: bin/spark-submit --py-files xxx.py job.py On Wed, Feb 18, 2015 at 1:48 PM, Guillaume Guy guillaume.c@gmail.com wrote: Hi, This is a duplicate of the stack-overflow question here. I hope to generate more interest on this mailing list. The problem: I am running into some attribute lookup problems when trying to initiate a class within my RDD. My workflow is quite standard: 1- Start with an RDD 2- Take each element of the RDD, initiate an object for each 3- Reduce (I will write a method that will define the reduce operation later on) Here is #2: class test(object): def __init__(self, a,b): self.total = a + b a = sc.parallelize([(True,False),(False,False)]) a.map(lambda (x,y): test(x,y)) Here is the error I get: PicklingError: Can't pickle class 'main.test' : attribute lookup main.test failed I'd like to know if there is any way around it. Please, answer with a working example to achieve the intended results (i.e. creating a RDD of objects of class tests). Thanks in advance! Related question: https://groups.google.com/forum/#!topic/edx-code/9xzRJFyQwn GG - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JdbcRDD, ClassCastException with scala.Function0
That's exactly what I was doing. However, I ran into runtime issues with doing that. For instance, I had a public class DbConnection extends AbstractFunction0Connection implements Serializable I got a runtime error from Spark complaining that DbConnection wasn't an instance of scala.Function0. I also had a public class MapRow extends scala.runtime.AbstractFunction1java.sql.ResultSet, Row implements Serializable with which I seemed to have more luck. On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger c...@koeninger.org wrote: Cant you implement the org.apache.spark.api.java.function.Function interface and pass an instance of that to JdbcRDD.create ? On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Cody, you were right, I had a copy and paste snag where I ended up with a vanilla SparkContext rather than a Java one. I also had to *not* use my function subclasses, rather just use anonymous inner classes for the Function stuff and that got things working. I'm fully following the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim. Is there a clean way to refactor out the custom Function classes such as the one for getting a db connection or mapping ResultSet data to your own POJO's rather than doing it all inline? On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger c...@koeninger.org wrote: Is sc there a SparkContext or a JavaSparkContext? The compilation error seems to indicate the former, but JdbcRDD.create expects the latter On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I have tried that as well, I get a compile error -- [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for create(SparkContext,anonymous ConnectionFactory,String,int,int,int,anonymous FunctionResultSet,Integer) The code is a copy and paste: JavaRDDInteger jdbcRDD = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { return DriverManager.getConnection(jdbc:derby:target/JavaJdbcRDDSuiteDb); } }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 1, new FunctionResultSet, Integer() { public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); The other thing I've tried was to define a static class locally for GetConnection and use the JdbcCreate constructor. This got around the compile issues but blew up at runtime with NoClassDefFoundError: scala/runtime/AbstractFunction0 ! JdbcRDDRow jdbcRDD = new JdbcRDDRow( sc, (AbstractFunction0Connection) new DbConn(), // had to cast or a compile error SQL_QUERY, 0L, 1000L, 10, new MapRow(), ROW_CLASS_TAG); // DbConn is defined as public static class DbConn extends AbstractFunction0Connection implements Serializable On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote: That test I linked https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 is calling a static method JdbcRDD.create, not new JdbcRDD. Is that what you tried doing? On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thanks, Cody. Yes, I originally started off by looking at that but I get a compile error if I try and use that approach: constructor JdbcRDD in class JdbcRDDT cannot be applied to given types. Not to mention that JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last argument). Wonder if it's a JDK version issue, I'm using 1.7. So I've got this, which doesn't compile JdbcRDDRow jdbcRDD = new JdbcRDDRow( new SparkContext(conf), new JdbcRDD.ConnectionFactory() { public Connection getConnection() throws SQLException { Connection conn = null; try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD); } catch (ClassNotFoundException ex) { throw new RuntimeException(Error while loading JDBC driver., ex); } return conn; } }, SELECT * FROM EMPLOYEES, 0L, 1000L, 10, new FunctionResultSet, Row() { public Row call(ResultSet r) throws Exception { return null; // have some actual logic here... } }, scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); The other approach was mimicing the DbConnection class from this post: http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. It got around any of the compilation issues but then I got the runtime error where Spark wouldn't recognize the db connection class as a scala.Function0. On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger c...@koeninger.org wrote: Take a look at https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java On Wed, Feb 18, 2015 at
Re: ML Transformer
Hi Cesar, Thanks for trying out Pipelines and bringing up this issue! It's been an experimental API, but feedback like this will help us prepare it for becoming non-Experimental. I've made a JIRA, and will vote for this being protected (instead of private[ml]) for Spark 1.3: https://issues.apache.org/jira/browse/SPARK-5902 Thanks again, Joseph On Wed, Feb 18, 2015 at 12:17 PM, Cesar Flores ces...@gmail.com wrote: I am working right now with the ML pipeline, which I really like it. However in order to make a real use of it, I would like create my own transformers that implements org.apache.spark.ml.Transformer. In order to do that, a method from the PipelineStage needs to be implemented. But this method is private to the ml package: private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): StructType Do any user can create their own transformers? If not, do this functionality will be added in the future. Thanks -- Cesar Flores
No suitable driver found error, Create table in hive from spark sql
No suitable driver found error, Create table in hive from spark sql. I am trying to execute following example. SPARKGIT: spark/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala My setup :- hadoop 1.6,spark 1.2, hive 1.0, mysql server (installed via yum install mysql55w mysql55w-server) I can create tables in hive from hive command prompt. / hive select * from person_parquet; OK Barack Obama M BillClinton M Hillary Clinton F Time taken: 1.945 seconds, Fetched: 3 row(s) / I am starting spark shell via following command:- ./spark-1.2.0-bin-hadoop2.4/bin/spark-shell --master spark://sparkmaster.company.com:7077 --jars /data/mysql-connector-java-5.1.14-bin.jar /scala Class.forName(com.mysql.jdbc.Driver) res0: Class[_] = class com.mysql.jdbc.Driver scala Class.forName(com.mysql.jdbc.Driver).newInstance res1: Any = com.mysql.jdbc.Driver@2dec8e27 scala val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@32ecf100 scala sqlContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) 15/02/18 22:23:01 INFO parse.ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/18 22:23:02 INFO parse.ParseDriver: Parse Completed 15/02/18 22:23:02 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/18 22:23:02 INFO metastore.ObjectStore: ObjectStore, initialize called 15/02/18 22:23:02 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/18 22:23:02 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/18 22:23:02 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/18 22:23:02 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/18 22:23:02 ERROR Datastore.Schema: Failed initialising database. No suitable driver found for jdbc:mysql://sparkmaster.company.com:3306/hive org.datanucleus.exceptions.NucleusDataStoreException: No suitable driver found for jdbc:mysql://sparkmaster.company.com:3306/hive at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:516) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:298) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301) at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187) at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at
Re: Processing graphs
Hi, Thanks for your reply. I basically want to check if my understanding what parallelize() on RDDs is correct. In my case, I create a vertex RDD and edge RDD and distribute them by calling parallelize(). Now does Spark perform any operation on these RDDs in parallel? For example, if I apply groupBy on the edge RDD (grouping by source vertex) and call a function F on the grouped RDD, will F be applied on each group in parallel and will Spark determine how to do this in parallel regardless of the number of groups? Thanks. On Tue, Feb 17, 2015 at 5:03 PM, Yifan LI iamyifa...@gmail.com wrote: Hi Kannan, I am not sure I have understood what your question is exactly, but maybe the reduceByKey or reduceByKeyLocally functionality is better to your need. Best, Yifan LI On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, I am working on a Spark application that processes graphs and I am trying to do the following. - group the vertices (key - vertex, value - set of its outgoing edges) - distribute each key to separate processes and process them (like mapper) - reduce the results back at the main process Does the groupBy functionality do the distribution by default? Do we have to explicitly use RDDs to enable automatic distribution? It'd be great if you could help me understand these and how to go about with the problem. Thanks.
Re: Spark Streaming output cannot be used as input?
+1 for writing the Spark output to Kafka. You can then hang off multiple compute/storage framework from kafka. I am using a similar pipeline to feed ElasticSearch and HDFS in parallel. Allows modularity, you can take down ElasticSearch or HDFS for maintenance without losing (except for some edge cases) data. You can even pipeline other Spark streaming apps off kafka to modularize your processing pipeline so you don't have one single big Spark app doing all the processing. On Wed, Feb 18, 2015 at 3:34 PM, Jose Fernandez jfernan...@sdl.com wrote: Thanks for the advice folks, it is much appreciated. This seems like a pretty unfortunate design flaw. My team was surprised by it. I’m going to drop the two-step process and do it all in a single step until we get Kafka online. *From:* Sean Owen [mailto:so...@cloudera.com] *Sent:* Wednesday, February 18, 2015 1:53 AM *To:* Emre Sevinc *Cc:* Jose Fernandez; user@spark.apache.org *Subject:* Re: Spark Streaming output cannot be used as input? To clarify, sometimes in the world of Hadoop people freely refer to an output 'file' when it's really a directory containing 'part-*' files which are pieces of the file. It's imprecise but that's the meaning. I think the scaladoc may be referring to 'the path to the file, which includes this parent dir, is generated ...' In an inherently distributed system, you want to distributed writes and reads, so big files are really made of logical files within a directory. There is a JIRA open to support nested dirs which has been languishing: https://issues.apache.org/jira/browse/SPARK-3586 I'm hoping to pursue that again with help from tdas after 1.3. That's probably the best solution. An alternative is to not use the file system as a sort of message queue, and instead use something like Kafka. It has a lot of other benefits but maybe it's not feasible to add this to your architecture. You can merge the files with HDFS APIs without much trouble. The dirs will be named consistently according to time and are something you can also query for. Making 1 partition has implications for parallelism of your job. Emre, I think I see what you're getting at but you have the map + materialize pattern which i think doesn't have the right guarantees about re-execution. Why not foreachRDD? Yes you can also consider collecting the whole RDD in foreachRDD and doing what you like, including writing to one file. But that would only work if the data is always small in each RDD. http://www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Jose, We've hit the same issue a couple of months ago. It is possible to write directly to files instead of creating directories, but it is not straightforward, and I haven't seen any clear demonstration in books, tutorials, etc. We do something like: SparkConf sparkConf = new SparkConf().setAppName(appName); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(batchInterval)); JavaDStreamString stream = MyModuleApp.initializeJob(ssc); MyModuleApp.process(stream); And then in the process method: @Override public void process(JavaDStreamString inStream) { JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); forceOutput(json); } This, in turn, calls the following (I've removed the irrelevant lines to focus on writing): public class MyModuleWorker implements FunctionString,String { public String call(String json) { // process the data and then write it writeJSON(json, validatedJSONoutputDir_); } } And the writeJSON method is: public static final void writeJSON(String json, String jsonDirPath) throws IOException { String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + .json.tmp; URI uri = URI.create(jsonFileName); Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(uri, conf); FSDataOutputStream out = fileSystem.create(new Path(uri)); out.write(json.getBytes(StandardCharsets.UTF_8)); out.close(); fileSystem.rename(new Path(uri), new Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() + .json))); } Using a similar technique you might be able to achieve your objective. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:32 AM, Jose
Re: Spark and Spark Streaming code sharing best practice.
Thanks Arush. I will check that out. On Wed, Feb 18, 2015 at 11:06 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: I find monoids pretty useful in this respect, basically separating out the logic in a monoid and then applying the logic to either a stream or a batch. A list of such practices could be really useful. On Thu, Feb 19, 2015 at 12:26 AM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, It seems pretty clear that one of the strength of Spark is to be able to share your code between your batch and streaming layer. Though, given that Spark streaming uses DStream being a set of RDDs and Spark uses a single RDD there might some complexity associated with it. Of course since DStream is a superset of RDDs, one can just run the same code at the RDD granularity using DStream::forEachRDD. While this should work for map, I am not sure how that can work when it comes to reduce phase given that a group of keys spans across multiple RDDs. One of the option is to change the dataset object on which a job works on. For example of passing an RDD to a class method, one passes a higher level object (MetaRDD) that wraps around RDD or DStream depending the context. At this point the job calls its regular maps, reduces and so on and the MetaRDD wrapper would delegate accordingly. Just would like to know the official best practice from the spark community though. Thanks, -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: No suitable driver found error, Create table in hive from spark sql
Found solution from one of the post found on internet. I updated spark/bin/compute-classpath.sh and added database connector jar into classpath. CLASSPATH=$CLASSPATH:/data/mysql-connector-java-5.1.14-bin.jar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-suitable-driver-found-error-Create-table-in-hive-from-spark-sql-tp21714p21715.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Streaming output cannot be used as input?
Thanks for the advice folks, it is much appreciated. This seems like a pretty unfortunate design flaw. My team was surprised by it. I’m going to drop the two-step process and do it all in a single step until we get Kafka online. From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, February 18, 2015 1:53 AM To: Emre Sevinc Cc: Jose Fernandez; user@spark.apache.org Subject: Re: Spark Streaming output cannot be used as input? To clarify, sometimes in the world of Hadoop people freely refer to an output 'file' when it's really a directory containing 'part-*' files which are pieces of the file. It's imprecise but that's the meaning. I think the scaladoc may be referring to 'the path to the file, which includes this parent dir, is generated ...' In an inherently distributed system, you want to distributed writes and reads, so big files are really made of logical files within a directory. There is a JIRA open to support nested dirs which has been languishing: https://issues.apache.org/jira/browse/SPARK-3586 I'm hoping to pursue that again with help from tdas after 1.3. That's probably the best solution. An alternative is to not use the file system as a sort of message queue, and instead use something like Kafka. It has a lot of other benefits but maybe it's not feasible to add this to your architecture. You can merge the files with HDFS APIs without much trouble. The dirs will be named consistently according to time and are something you can also query for. Making 1 partition has implications for parallelism of your job. Emre, I think I see what you're getting at but you have the map + materialize pattern which i think doesn't have the right guarantees about re-execution. Why not foreachRDD? Yes you can also consider collecting the whole RDD in foreachRDD and doing what you like, including writing to one file. But that would only work if the data is always small in each RDD. [http://www.sdl.com/Content/images/Innovate_2015_400.png] www.sdl.com/innovate/sanfran SDL PLC confidential, all rights reserved. If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc emre.sev...@gmail.commailto:emre.sev...@gmail.com wrote: Hello Jose, We've hit the same issue a couple of months ago. It is possible to write directly to files instead of creating directories, but it is not straightforward, and I haven't seen any clear demonstration in books, tutorials, etc. We do something like: SparkConf sparkConf = new SparkConf().setAppName(appName); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(batchInterval)); JavaDStreamString stream = MyModuleApp.initializeJob(ssc); MyModuleApp.process(stream); And then in the process method: @Override public void process(JavaDStreamString inStream) { JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); forceOutput(json); } This, in turn, calls the following (I've removed the irrelevant lines to focus on writing): public class MyModuleWorker implements FunctionString,String { public String call(String json) { // process the data and then write it writeJSON(json, validatedJSONoutputDir_); } } And the writeJSON method is: public static final void writeJSON(String json, String jsonDirPath) throws IOException { String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + .json.tmp; URI uri = URI.create(jsonFileName); Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(uri, conf); FSDataOutputStream out = fileSystem.create(new Path(uri)); out.write(json.getBytes(StandardCharsets.UTF_8)); out.close(); fileSystem.rename(new Path(uri), new Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() + .json))); } Using a similar technique you might be able to achieve your objective. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez jfernan...@sdl.commailto:jfernan...@sdl.com wrote: Hello folks, Our intended use case is: - Spark Streaming app #1 reads from RabbitMQ and output to HDFS - Spark Streaming app #2 reads #1’s output and stores the data into Elasticsearch The idea behind this architecture is that if Elasticsearch is down due to an upgrade or system error we don’t have to stop reading messages from the queue. We could also scale each process separately as needed. After a few hours research my understanding is that Spark Streaming outputs files in a *directory* for which
Re: Spark Streaming and message ordering
This is a *fantastic* question. The idea of how we identify individual things in multiple DStreams is worth looking at. The reason being, that you can then fine tune your streaming job, based on the RDD identifiers (i.e. are the timestamps from the producer correlating closely to the order in which RDD elements are being produced) ? If *NO* then you need to (1) dial up throughput on producer sources or else (2) increase cluster size so that spark is capable of evenly handling load. You cant decide to do (1) or (2) unless you can track when the streaming elements are being converted to RDDs by spark itself. On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote: There does not seem to be a definitive answer on this. Every time I google for message ordering,the only relevant thing that comes up is this - http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html . With a kafka receiver that pulls data from a single kafka partition of a kafka topic, are individual messages in the microbatch in same the order as kafka partition? Are successive microbatches originating from a kafka partition executed in order? Thanks! -- jay vyas
RE: NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr
You need to instantiate the server in the forEachPartition block or Spark will attempt to serialize it to the task. See the design patterns section in the Spark Streaming guide. Jose Fernandez | Principal Software Developer jfernan...@sdl.com | The information transmitted, including attachments, is intended only for the person(s) or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and destroy any copies of this information. Jose Fernandez | Principal Software Developer jfernan...@sdl.com | The information transmitted, including attachments, is intended only for the person(s) or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and destroy any copies of this information. -Original Message- From: dgoldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, February 18, 2015 1:54 PM To: user@spark.apache.org Subject: NotSerializableException: org.apache.http.impl.client.DefaultHttpClient when trying to send documents to Solr I'm using Solrj in a Spark program. When I try to send the docs to Solr, I get the NotSerializableException on the DefaultHttpClient. Is there a possible fix or workaround? I'm using Spark 1.2.1 with Hadoop 2.4, SolrJ is version 4.0.0. final HttpSolrServer solrServer = new HttpSolrServer(SOLR_SERVER_URL); ... JavaRDDSolrInputDocument solrDocs = rdd.map(new FunctionRow, SolrInputDocument() { public SolrInputDocument call(Row r) { return r.toSolrDocument(); } }); solrDocs.foreachPartition(new VoidFunctionIteratorlt;SolrInputDocument() { public void call(IteratorSolrInputDocument solrDocIterator) throws Exception { ListSolrInputDocument batch = new ArrayListSolrInputDocument(); while (solrDocIterator.hasNext()) { SolrInputDocument inputDoc = solrDocIterator.next(); batch.add(inputDoc); if (batch.size() = batchSize) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } if (!batch.isEmpty()) { Utils.sendBatchToSolr(solrServer, solrCollection, batch); } } }); Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:789) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:195) at org.apache.spark.api.java.JavaRDD.foreachPartition(JavaRDD.scala:32) at com.kona.motivis.spark.proto.SparkProto.execute(SparkProto.java:158) at com.kona.motivis.spark.proto.SparkProto.main(SparkProto.java:186) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.http.impl.client.DefaultHttpClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at
Spark Streaming and message ordering
There does not seem to be a definitive answer on this. Every time I google for message ordering,the only relevant thing that comes up is this - http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html . With a kafka receiver that pulls data from a single kafka partition of a kafka topic, are individual messages in the microbatch in same the order as kafka partition? Are successive microbatches originating from a kafka partition executed in order? Thanks!