Re: Building Spark
My 2 cents: If you have Java 8, you don't need any extra settings for Maven. -- Emre Sevinç On Wed, May 13, 2015 at 3:02 PM, Stephen Boesch java...@gmail.com wrote: Hi Akhil, Building with sbt tends to need around 3.5GB whereas maven requirements are much lower , around 1.7GB. So try using maven . For reference I have the following settings and both do compile. sbt would not work with lower values. $echo $SBT_OPTS -Xmx3012m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m $echo $MAVEN_OPTS -Xmx1280m -XX:MaxPermSize=384m 2015-05-13 5:57 GMT-07:00 Heisenberg Bb hbbalg...@gmail.com: I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my system is getting hanged (freezed). When I monitered system processes, the build process is found to consume 85% of my memory. Why does it need lot of resources. Is there any efficient method to build Spark. Thanks Akhil -- Emre Sevinc
Re: How to deal with code that runs before foreach block in Apache Spark?
Imran, Gerard, Indeed your suggestions were correct and it helped me. Thank you for your replies. -- Emre On Tue, May 5, 2015 at 4:24 PM, Imran Rashid iras...@cloudera.com wrote: Gerard is totally correct -- to expand a little more, I think what you want to do is a solrInputDocumentJavaRDD.foreachPartition, instead of solrInputDocumentJavaRDD.foreach: solrInputDocumentJavaRDD.foreachPartition( new VoidFunctionIteratorSolrInputDocument() { @Override public void call(IteratorSolrInputDocument docItr) { ListSolrInputDocument docs = new ArrayListSolrInputDocument(); for(SolrInputDocument solrInputDocument: docItr) { // Add the solrInputDocument to the list of SolrInputDocuments docs.add(solrInputDocument); } // push things to solr **from the executor, for this partition** // so for this make sense, you need to be sure solr can handle a bunch // of executors pushing into it simultaneously. addThingsToSolr(docs); } }); On Mon, May 4, 2015 at 8:44 AM, Gerard Maas gerard.m...@gmail.com wrote: I'm not familiar with the Solr API but provided that ' SolrIndexerDriver' is a singleton, I guess that what's going on when running on a cluster is that the call to: SolrIndexerDriver.solrInputDocumentList.add(elem) is happening on different singleton instances of the SolrIndexerDriver on different JVMs while SolrIndexerDriver.solrServer.commit is happening on the driver. In practical terms, the lists on the executors are being filled-in but they are never committed and on the driver the opposite is happening. -kr, Gerard On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc emre.sev...@gmail.com wrote: I'm trying to deal with some code that runs differently on Spark stand-alone mode and Spark running on a cluster. Basically, for each item in an RDD, I'm trying to add it to a list, and once this is done, I want to send this list to Solr. This works perfectly fine when I run the following code in stand-alone mode of Spark, but does not work when the same code is run on a cluster. When I run the same code on a cluster, it is like send to Solr part of the code is executed before the list to be sent to Solr is filled with items. I try to force the execution by solrInputDocumentJavaRDD.collect(); after foreach, but it seems like it does not have any effect. // For each RDD solrInputDocumentJavaDStream.foreachRDD( new FunctionJavaRDDSolrInputDocument, Void() { @Override public Void call(JavaRDDSolrInputDocument solrInputDocumentJavaRDD) throws Exception { // For each item in a single RDD solrInputDocumentJavaRDD.foreach( new VoidFunctionSolrInputDocument() { @Override public void call(SolrInputDocument solrInputDocument) { // Add the solrInputDocument to the list of SolrInputDocuments SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); } }); // Try to force execution solrInputDocumentJavaRDD.collect(); // After having finished adding every SolrInputDocument to the list // add it to the solrServer, and commit, waiting for the commit to be flushed try { // Seems like when run in cluster mode, the list size is zero, // therefore the following part is never executed if (SolrIndexerDriver.solrInputDocumentList != null SolrIndexerDriver.solrInputDocumentList.size() 0) { SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); SolrIndexerDriver.solrServer.commit(true, true); SolrIndexerDriver.solrInputDocumentList.clear(); } } catch (SolrServerException | IOException e) { e.printStackTrace(); } return null; } } ); What should I do, so that sending-to-Solr part executes after the list of SolrDocuments are added to solrInputDocumentList (and works also in cluster mode)? -- Emre Sevinç -- Emre Sevinc
How to deal with code that runs before foreach block in Apache Spark?
I'm trying to deal with some code that runs differently on Spark stand-alone mode and Spark running on a cluster. Basically, for each item in an RDD, I'm trying to add it to a list, and once this is done, I want to send this list to Solr. This works perfectly fine when I run the following code in stand-alone mode of Spark, but does not work when the same code is run on a cluster. When I run the same code on a cluster, it is like send to Solr part of the code is executed before the list to be sent to Solr is filled with items. I try to force the execution by solrInputDocumentJavaRDD.collect(); after foreach, but it seems like it does not have any effect. // For each RDD solrInputDocumentJavaDStream.foreachRDD( new FunctionJavaRDDSolrInputDocument, Void() { @Override public Void call(JavaRDDSolrInputDocument solrInputDocumentJavaRDD) throws Exception { // For each item in a single RDD solrInputDocumentJavaRDD.foreach( new VoidFunctionSolrInputDocument() { @Override public void call(SolrInputDocument solrInputDocument) { // Add the solrInputDocument to the list of SolrInputDocuments SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); } }); // Try to force execution solrInputDocumentJavaRDD.collect(); // After having finished adding every SolrInputDocument to the list // add it to the solrServer, and commit, waiting for the commit to be flushed try { // Seems like when run in cluster mode, the list size is zero, // therefore the following part is never executed if (SolrIndexerDriver.solrInputDocumentList != null SolrIndexerDriver.solrInputDocumentList.size() 0) { SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); SolrIndexerDriver.solrServer.commit(true, true); SolrIndexerDriver.solrInputDocumentList.clear(); } } catch (SolrServerException | IOException e) { e.printStackTrace(); } return null; } } ); What should I do, so that sending-to-Solr part executes after the list of SolrDocuments are added to solrInputDocumentList (and works also in cluster mode)? -- Emre Sevinç
Re: Spark Unit Testing
Hello James, Did you check the following resources: - https://github.com/apache/spark/tree/master/streaming/src/test/java/org/apache/spark/streaming - http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs -- Emre Sevinç http://www.bigindustries.be/ On Tue, Apr 21, 2015 at 1:26 PM, James King jakwebin...@gmail.com wrote: I'm trying to write some unit tests for my spark code. I need to pass a JavaPairDStreamString, String to my spark class. Is there a way to create a JavaPairDStream using Java API? Also is there a good resource that covers an approach (or approaches) for unit testing using Java. Regards jk -- Emre Sevinc
Re: override log4j.properties
One method: By putting your custom log4j.properties file in your /resources directory. As an example, please see: http://stackoverflow.com/a/2736/236007 Kind regards, Emre Sevinç http://www.bigindustries.be/ On Thu, Apr 9, 2015 at 2:17 PM, patcharee patcharee.thong...@uni.no wrote: Hello, How to override log4j.properties for a specific spark job? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: Query REST web service with Spark?
Hello Minnow, It is possible. You can for example use Jersey REST client to query a web service and get its results in a Spark job. In fact, that's what we did actually in a recent project (in a Spark Streaming application). Kind regards, Emre Sevinç http://www.bigindustries.be/ On Tue, Mar 31, 2015 at 10:46 PM, Minnow Noir minnown...@gmail.com wrote: We have have some data on Hadoop that needs augmented with data only available to us via a REST service. We're using Spark to search for, and correct, missing data. Even though there are a lot of records to scour for missing data, the total number of calls to the service is expected to be low, so it would be ideal to do the whole job in Spark as we scour the data. I don't see anything obvious in the API or on Google relating to making REST calls from a Spark job. Is it possible? Thanks, Alec -- Emre Sevinc
Re: log4j.properties in jar
Hello Udit, Yes, what you ask is possible. If you follow the Spark documentation and tutorial about how to build stand-alone applications, you can see that it is possible to build a stand-alone, über-JAR file that includes everything. For example, if you want to suppress some messages by modifying log4j in unit tests, you can do the following: http://stackoverflow.com/questions/27248997/how-to-suppress-spark-logging-in-unit-tests/2736#2736 Hope this helps. -- Emre Sevinç http://www.bigindustries.be/ On Mon, Mar 30, 2015 at 10:24 PM, Udit Mehta ume...@groupon.com wrote: Hi, Is it possible to put the log4j.properties in the application jar such that the driver and the executors use this log4j file. Do I need to specify anything while submitting my app so that this file is used? Thanks, Udit -- Emre Sevinc
Re: Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?
Hello Sandy, Thank you for your explanation. Then I would at least expect that to be consistent across local, yarn-client, and yarn-cluster modes. (And not lead to the case where it somehow works in two of them, and not for the third). Kind regards, Emre Sevinç http://www.bigindustries.be/ On Tue, Mar 24, 2015 at 4:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Ah, yes, I believe this is because only properties prefixed with spark get passed on. The purpose of the --conf option is to allow passing Spark properties to the SparkConf, not to add general key-value pairs to the JVM system properties. -Sandy On Tue, Mar 24, 2015 at 4:25 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sandy, Your suggestion does not work when I try it locally: When I pass --conf key=someValue and then try to retrieve it like: SparkConf sparkConf = new SparkConf(); logger.info(* * * key ~~~ {}, sparkConf.get(key)); I get Exception in thread main java.util.NoSuchElementException: key And I think that's expected because the key is an arbitrary one, not necessarily a Spark configuration element. This is why I was passing it via --conf and retrieving System.getProperty(key) (which worked locally and in yarn-client mode but not in yarn-cluster mode). I'm surprised why I can't use it on the cluster while I can use it while local development and testing. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Mon, Mar 23, 2015 at 6:15 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Emre, The --conf property is meant to work with yarn-cluster mode. System.getProperty(key) isn't guaranteed, but new SparkConf().get(key) should. Does it not? -Sandy On Mon, Mar 23, 2015 at 8:39 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, According to Spark Documentation at https://spark.apache.org/docs/1.2.1/submitting-applications.html : --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). And indeed, when I use that parameter, in my Spark program I can retrieve the value of the key by using: System.getProperty(key); This works when I test my program locally, and also in yarn-client mode, I can log the value of the key and see that it matches what I wrote in the command line, but it returns *null* when I submit the very same program in *yarn-cluster* mode. Why can't I retrieve the value of key given as --conf key=value when I submit my Spark application in *yarn-cluster* mode? Any ideas and/or workarounds? -- Emre Sevinç http://www.bigindustries.be/ -- Emre Sevinc -- Emre Sevinc
Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?
Hello, According to Spark Documentation at https://spark.apache.org/docs/1.2.1/submitting-applications.html : --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). And indeed, when I use that parameter, in my Spark program I can retrieve the value of the key by using: System.getProperty(key); This works when I test my program locally, and also in yarn-client mode, I can log the value of the key and see that it matches what I wrote in the command line, but it returns *null* when I submit the very same program in *yarn-cluster* mode. Why can't I retrieve the value of key given as --conf key=value when I submit my Spark application in *yarn-cluster* mode? Any ideas and/or workarounds? -- Emre Sevinç http://www.bigindustries.be/
Re: log files of failed task
Hello Sergun, Generally you can use yarn application -list to see the applicationIDs of applications and then you can see the logs of finished applications using: yarn logs -applicationId applicationID Hope this helps. -- Emre Sevinç http://www.bigindustries.be/ On Mon, Mar 23, 2015 at 8:23 AM, sergunok ser...@gmail.com wrote: Hi, I executed a task on Spark in YARN and it failed. I see just executor lost message from YARNClientScheduler, no further details.. (I read ths error can be connected to spark.yarn.executor.memoryOverhead setting and already played with this param) How to go more deeply in details in log files and find exact reason? How can log of failed task be examined? Unfortunately I haven't access to UI of Spark just can use command line. Thanks! Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: Writing Spark Streaming Programs
Hello James, I've been working with Spark Streaming for the last 6 months, and I'm coding in Java 7. Even though I haven't encountered any blocking issues with that combination, I'd definitely pick Scala if the decision was up to me. I agree with Gerard and Charles on this one. If you can, go with Scala for Spark Streaming applications. Cheers, Emre Sevinç http://www.bigindustries.be/ On Thu, Mar 19, 2015 at 4:09 PM, James King jakwebin...@gmail.com wrote: Many thanks Gerard, this is very helpful. Cheers! On Thu, Mar 19, 2015 at 4:02 PM, Gerard Maas gerard.m...@gmail.com wrote: Try writing this Spark Streaming idiom in Java and you'll choose Scala soon enough: dstream.foreachRDD{rdd = rdd.foreachPartition( partition = ) } When deciding between Java and Scala for Spark, IMHO Scala has the upperhand. If you're concerned with readability, have a look at the Scala coding style recently open sourced by DataBricks: https://github.com/databricks/scala-style-guide (btw, I don't agree a good part of it, but recognize that it can keep the most complex Scala constructions out of your code) On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk -- Emre Sevinc
Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
I've also tried the following: Configuration hadoopConfiguration = new Configuration(); hadoopConfiguration.set(multilinejsoninputformat.member, itemSet); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, factory, false); but I still get the same exception. Why doesn't getOrCreate ignore that Hadoop configuration part (which normally works, e.g. when not recovering)? -- Emre On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory. It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:== (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Since in the exception it refers to a missing configuration multilinejsoninputformat.member, I think it is about the following line: ssc.ssc().sc().hadoopConfiguration().set( multilinejsoninputformat.member, itemSet); And this is why I also log the value of it, and as you can see above, just before it gives the exception in the recovery process, it shows that multilinejsoninputformat.member is set to itemSet. But somehow it is not found during the recovery. This exception happens only when it tries to recover from a previously interrupted run. I've also tried moving the above line into the createContext method, but still had the same exception. Why is that? And how can I work around it? -- Emre Sevinç http://www.bigindustries.be/ -- Emre Sevinc
Is FileInputDStream returned by fileStream method a reliable receiver?
Is FileInputDStream returned by fileStream method a reliable receiver? In the Spark Streaming Guide it says: There can be two kinds of data sources based on their *reliability*. Sources (like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving data from these *reliable* sources acknowledge the received data correctly, it can be ensured that no data gets lost due to any kind of failure. This leads to two kinds of receivers. 1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a reliable source that the data has been received and stored in Spark with replication. 2. *Unreliable Receiver* - These are receivers for sources that do not support acknowledging. Even for reliable sources, one may implement an unreliable receiver that do not go into the complexity of acknowledging correctly. So I wonder whether the receivers for HDFS (and local file system) are reliable, e.g. when I'm using fileStream method to process files in a directory locally or on HDFS? -- Emre Sevinç
Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
I'm adding this 3rd party library to my Maven pom.xml file so that it's embedded into the JAR I send to spark-submit: dependency groupIdjson-mapreduce/groupId artifactIdjson-mapreduce/artifactId version1.0-SNAPSHOT/version exclusions exclusion groupIdjavax.servlet/groupId artifactId*/artifactId /exclusion exclusion groupIdcommons-io/groupId artifactId*/artifactId /exclusion exclusion groupIdcommons-lang/groupId artifactId*/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId /exclusion /exclusions /dependency Then I build my über JAR, and then I run my Spark Streaming application via the command line: spark-submit --class com.example.schemavalidator.SchemaValidatorDriver --master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar -- Emre Sevinç On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das t...@databricks.com wrote: That could be a corner case bug. How do you add the 3rd party library to the class path of the driver? Through spark-submit? Could you give the command you used? TD On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com wrote: I've also tried the following: Configuration hadoopConfiguration = new Configuration(); hadoopConfiguration.set(multilinejsoninputformat.member, itemSet); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, factory, false); but I still get the same exception. Why doesn't getOrCreate ignore that Hadoop configuration part (which normally works, e.g. when not recovering)? -- Emre On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory. It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:== (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run
Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory. It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:== (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Since in the exception it refers to a missing configuration multilinejsoninputformat.member, I think it is about the following line: ssc.ssc().sc().hadoopConfiguration().set(multilinejsoninputformat.member , itemSet); And this is why I also log the value of it, and as you can see above, just before it gives the exception in the recovery process, it shows that multilinejsoninputformat.member is set to itemSet. But somehow it is not found during the recovery. This exception happens only when it tries to recover from a previously interrupted run. I've also tried moving the above line into the createContext method, but still had the same exception. Why is that? And how can I work around it? -- Emre Sevinç http://www.bigindustries.be/
Re: Issues reading in Json file with spark sql
According to Spark SQL Programming Guide: jsonFile - loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file that is offered as jsonFile is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. -- Emre Sevinç http://www.bigindustries.be On Mar 2, 2015 8:29 PM, kpeng1 kpe...@gmail.com wrote: Hi All, I am currently having issues reading in a json file using spark sql's api. Here is what the json file looks like: { namespace: spacey, name: namer, type: record, fields: [ {name:f1,type:[null,string]}, {name:f2,type:[null,string]}, {name:f3,type:[null,string]}, {name:f4,type:[null,string]}, {name:f5,type:[null,string]}, {name:f6,type:[null,string]}, {name:f7,type:[null,string]}, {name:f8,type:[null,string]}, {name:f9,type:[null,string]}, {name:f10,type:[null,string]}, {name:f11,type:[null,string]}, {name:f12,type:[null,string]}, {name:f13,type:[null,string]}, {name:f14,type:[null,string]}, {name:f15,type:[null,string]} ] } This is what I am doing to read in the json file(using spark sql in the spark shell on CDH5.3): val sqlsc = new org.apache.spark.sql.SQLContext(sc) val j = sqlsc.jsonFile(/tmp/try.avsc) This is what I am getting as an error: 15/03/02 11:23:45 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 12, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.1 in stage 3.0 (TID 14, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 13) in 128 ms on 10.0.2.15 (1/2) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.1 in stage 3.0 (TID 14) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 1] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.2 in stage 3.0 (TID 15, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.2 in stage 3.0 (TID 15) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 2] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.3 in stage 3.0 (TID 16, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.3 in stage 3.0 (TID 16) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 3] 15/03/02 11:23:45 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job 15/03/02 11:23:45 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/03/02 11:23:45 INFO TaskSchedulerImpl: Cancelling stage 3 15/03/02 11:23:45 INFO DAGScheduler: Job 3 failed: reduce at JsonRDD.scala:57, took 0.210707 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 16, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at
Re: Which one is faster / consumes less memory: collect() or count()?
On Thu, Feb 26, 2015 at 4:20 PM, Sean Owen so...@cloudera.com wrote: Yea we discussed this on the list a short while ago. The extra overhead of count() is pretty minimal. Still you could wrap this up as a utility method. There was even a proposal to add some 'materialize' method to RDD. I definitely would like to vote up for that proposal. -- Emre
Re: Which one is faster / consumes less memory: collect() or count()?
Hello Sean, Thank you for your advice. Based on your suggestion, I've modified the code into the following (and once again admired the easy (!) verbosity of Java compared to 'complex and hard to understand' brevity (!) of Scala): javaDStream.foreachRDD( new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.foreachPartition( new VoidFunctionIteratorString() { @Override public void call(IteratorString iteratorString) { return; } } ); return null; } }); I've tested the above in my application, and also observed it with Visual VM but could not see a dramatic speed difference (and small heap usage difference) compared to my initial version where I just use .count() in a foreachRDD block. Nevertheless I'll make more experiments to see if differences come up in terms of speed/memory. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Thu, Feb 26, 2015 at 2:34 PM, Sean Owen so...@cloudera.com wrote: Those do quite different things. One counts the data; the other copies all of the data to the driver. The fastest way to materialize an RDD that I know of is foreachPartition(i = None) (or equivalent no-op VoidFunction in Java) On Thu, Feb 26, 2015 at 1:28 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a piece of code to force the materialization of RDDs in my Spark Streaming program, and I'm trying to understand which method is faster and has less memory consumption: javaDStream.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { //stringJavaRDD.collect(); // or count? //stringJavaRDD.count(); return null; } }); I've checked the source code of Spark at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala , and see that collect() is defined as: def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) Array.concat(results: _*) } and count() defined as: def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum Therefore I think calling the count() method is faster and/or consumes less memory, but I wanted to be sure. Anyone cares to comment? -- Emre Sevinç http://www.bigindustries.be/ -- Emre Sevinc
Re: Which one is faster / consumes less memory: collect() or count()?
Francois, Thank you for quickly verifying. Kind regards, Emre Sevinç On Thu, Feb 26, 2015 at 2:32 PM, francois.garil...@typesafe.com wrote: The short answer: count(), as the sum can be partially aggregated on the mappers. The long answer: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html — FG On Thu, Feb 26, 2015 at 2:28 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a piece of code to force the materialization of RDDs in my Spark Streaming program, and I'm trying to understand which method is faster and has less memory consumption: javaDStream.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { //stringJavaRDD.collect(); // or count? //stringJavaRDD.count(); return null; } }); I've checked the source code of Spark at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala, and see that collect() is defined as: def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) Array.concat(results: _*) } and count() defined as: def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum Therefore I think calling the count() method is faster and/or consumes less memory, but I wanted to be sure. Anyone cares to comment? -- Emre Sevinç http://www.bigindustries.be/ -- Emre Sevinc
Which one is faster / consumes less memory: collect() or count()?
Hello, I have a piece of code to force the materialization of RDDs in my Spark Streaming program, and I'm trying to understand which method is faster and has less memory consumption: javaDStream.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { //stringJavaRDD.collect(); // or count? //stringJavaRDD.count(); return null; } }); I've checked the source code of Spark at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala, and see that collect() is defined as: def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) Array.concat(results: _*) } and count() defined as: def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum Therefore I think calling the count() method is faster and/or consumes less memory, but I wanted to be sure. Anyone cares to comment? -- Emre Sevinç http://www.bigindustries.be/
Re: Get filename in Spark Streaming
Hello Subacini, Until someone more knowledgeable suggests a better, more straightforward, and simpler approach with a working code snippet, I suggest the following workaround / hack: inputStream.foreachRDD(rdd = val myStr = rdd.toDebugString // process myStr string value, e.g. using regular expressions ) For example if you print myStr, you can see in your log / consol output somehing similar to: 15/02/24 15:14:56 INFO FileInputFormat: Total input paths to process : 1 15/02/24 15:14:56 INFO JobScheduler: Added jobs for time 1424787295000 ms 15/02/24 15:14:56 INFO JobScheduler: Starting job streaming job 1424787295000 ms.0 from job set of time 1424787295000 ms (20) MappedRDD[27] at textFileStream at kmeans.scala:17 [] | UnionRDD[26] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL14.json NewHadoopRDD[6] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL11.json NewHadoopRDD[7] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL10.json NewHadoopRDD[8] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL6.json NewHadoopRDD[9] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL8.json NewHadoopRDD[10] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL5.json NewHadoopRDD[11] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL1.json NewHadoopRDD[12] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL9.json NewHadoopRDD[13] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL2.json NewHadoopRDD[14] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL16.json NewHadoopRDD[15] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL20.json NewHadoopRDD[16] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL12.json NewHadoopRDD[17] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL4.json NewHadoopRDD[18] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL19.json NewHadoopRDD[19] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL7.json NewHadoopRDD[20] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL17.json NewHadoopRDD[21] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL18.json NewHadoopRDD[22] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL3.json NewHadoopRDD[23] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL13.json NewHadoopRDD[24] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL15.json NewHadoopRDD[25] at textFileStream at kmeans.scala:17 [] 15/02/24 15:14:56 INFO JobScheduler: Finished job streaming job 1424787295000 ms.0 from job set of time 1424787295000 ms 15/02/24 15:14:56 INFO JobScheduler: Total delay: 1.420 s for time 1424787295000 ms (execution: 0.051 s) 15/02/24 15:14:56 INFO MappedRDD: Removing RDD 5 from persistence list 15/02/24 15:14:56 INFO BlockManager: Removing RDD 5 15/02/24 15:14:56 INFO FileInputDStream: Cleared 0 old files that were older than 1424787235000 ms: 15/02/24 15:14:56 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() You can process the string to retrieve each section that starts with file: and ends with a space. Then for each such string you can get your timestamp from the file name. -- Emre Sevinç http://www.bigindustries.be/ On Fri, Feb 6, 2015 at 9:33 PM, Subacini B subac...@gmail.com wrote: Thank you Emre, This helps, i am able to get filename. But i am not sure how to fit this into Dstream RDD. val inputStream = ssc.textFileStream(/hdfs Path/) inputStream is Dstreamrdd and in foreachrdd , am doing my processing inputStream.foreachRDD(rdd = { * //how to get filename here??* }) Can you please help. On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, Did you check the following? http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html -- Emre Sevinç On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote: Hi All, We have filename with timestamp say ABC_1421893256000.txt and the timestamp needs to be extracted from file name for further processing.Is there a way to get input file name picked up by spark streaming job? Thanks in advance Subacini -- Emre Sevinc -- Emre Sevinc
Re: Can you add Big Industries to the Powered by Spark page?
Hello, Thanks for adding, but URL seems to have a typo: when I click it tries to open http//www.bigindustries.be/ But it should be: http://www.bigindustries.be/ Kind regards, Emre Sevinç http://http//www.bigindustries.be/ On Feb 25, 2015 12:29 AM, Patrick Wendell pwend...@gmail.com wrote: I've added it, thanks! On Fri, Feb 20, 2015 at 12:22 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, Could you please add Big Industries to the Powered by Spark page at https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ? Company Name: Big Industries URL: http://http://www.bigindustries.be/ Spark Components: Spark Streaming Use Case: Big Content Platform Summary: The Big Content Platform is a business-to-business content asset management service providing a searchable, aggregated source of live news feeds, public domain media and archives of content. The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the platform leverages public datasets like Freebase, DBpedia, Wiktionary, and Geonames to support semantic text enrichment. Kind regards, Emre Sevinç http://www.bigindustries.be/
Re: Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?
Hello Todd, Thank you for your suggestion! I have first tried increasing the Driver memory to 2G and it worked without any problems, but I will also test with the parameters and values you've shared. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Fri, Feb 20, 2015 at 3:25 PM, Todd Nist tsind...@gmail.com wrote: Hi Emre, Have you tried adjusting these: .set(spark.akka.frameSize, 500).set(spark.akka.askTimeout, 30).set(spark.core.connection.ack.wait.timeout, 600) -Todd On Fri, Feb 20, 2015 at 8:14 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, We are building a Spark Streaming application that listens to a directory on HDFS, and uses the SolrJ library to send newly detected files to a Solr server. When we put 10.000 files to the directory it is listening to, it starts to process them by sending the files to our Solr server but after about a few thousand files the Spark Streaming application dies. Before the application dies, It gives some TimeoutException errors related to Akka, such as: util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] akka.pattern.AskTimeoutException: Timed out Any ideas on how to deal with this? Should we add/change/tweak some Spark configuration parameters? We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and 2GB of memory to that application when invoking it via spark-submit command. Below you can read the last few lines of the log file, showing what our Spark Streaming application logged just before it died: 15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split: hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620 15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3004 15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called with curMem=31171148, maxMem=794647 15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0 stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB) 15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block broadcast_3004_piece0 15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3004 took 7897 ms 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=31192144, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored as values in memory (estimated size 339.2 KB, free 1030.2 MB) 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called with curMem=31539507, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as bytes in memory (estimated size 2.6 KB, free 1030.2 MB) 15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block rdd_3659_3 15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client, config:maxConnections=128maxConnectionsPerHost=32followRedirects=false 15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=31542134, maxMem=794647 15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as bytes in memory (estimated size 5.0 B, free 1030.2 MB) 15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block rdd_3660_3 15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage 245.0 (TID 3455). 2516 bytes result sent to driver 15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] - [akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated! Shutting down. LogType: stdout LogLength: 0 Log Contents: Container: container_1422006251277_0837_01_04 on node08.demo.hadoop_8041 == LogType: stderr LogLength: 2952 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0
Re: Streaming Linear Regression
Hello Baris, Giving your complete source code (if not very long, or maybe via https://gist.github.com/) could be more helpful. Also telling which Spark version you use, on which file system, and how you run your application, together with the any log / output info it produces might make collective debugging relatively easier. -- Emre Sevinç http://www.bigindustries.be/ On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote: Hi I tried to run Streaming Linear Regression in my local. val trainingData = ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse) textFileStream is not seeing the new files. I search on the Internet, and I saw that somebody has same issue but no solution is found for that. Is there any opinion for this ? Is there any body who can achieve the running streaming linear regression ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Can you add Big Industries to the Powered by Spark page?
Hello, Could you please add Big Industries to the Powered by Spark page at https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ? Company Name: Big Industries URL: http://http://www.bigindustries.be/ Spark Components: Spark Streaming Use Case: Big Content Platform Summary: The Big Content Platform is a business-to-business content asset management service providing a searchable, aggregated source of live news feeds, public domain media and archives of content. The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the platform leverages public datasets like Freebase, DBpedia, Wiktionary, and Geonames to support semantic text enrichment. Kind regards, Emre Sevinç http://www.bigindustries.be/
Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?
Hello, We are building a Spark Streaming application that listens to a directory on HDFS, and uses the SolrJ library to send newly detected files to a Solr server. When we put 10.000 files to the directory it is listening to, it starts to process them by sending the files to our Solr server but after about a few thousand files the Spark Streaming application dies. Before the application dies, It gives some TimeoutException errors related to Akka, such as: util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] akka.pattern.AskTimeoutException: Timed out Any ideas on how to deal with this? Should we add/change/tweak some Spark configuration parameters? We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and 2GB of memory to that application when invoking it via spark-submit command. Below you can read the last few lines of the log file, showing what our Spark Streaming application logged just before it died: 15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split: hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620 15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3004 15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called with curMem=31171148, maxMem=794647 15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0 stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB) 15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block broadcast_3004_piece0 15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3004 took 7897 ms 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=31192144, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored as values in memory (estimated size 339.2 KB, free 1030.2 MB) 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called with curMem=31539507, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as bytes in memory (estimated size 2.6 KB, free 1030.2 MB) 15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block rdd_3659_3 15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client, config:maxConnections=128maxConnectionsPerHost=32followRedirects=false 15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=31542134, maxMem=794647 15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as bytes in memory (estimated size 5.0 B, free 1030.2 MB) 15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block rdd_3660_3 15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage 245.0 (TID 3455). 2516 bytes result sent to driver 15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] - [akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated! Shutting down. LogType: stdout LogLength: 0 Log Contents: Container: container_1422006251277_0837_01_04 on node08.demo.hadoop_8041 == LogType: stderr LogLength: 2952 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/02/20 13:29:26 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/02/20 13:29:27 INFO spark.SecurityManager: Changing view acls to: yarn,bjorn 15/02/20 13:29:27 INFO spark.SecurityManager: Changing modify acls to: yarn,bjorn 15/02/20 13:29:27 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, bjorn); users with modify permissions:
Re: Streaming Linear Regression
Baris, I've tried the following piece of code: https://gist.github.com/emres/10c509c1d69264fe6fdb and built it using sbt package and then submitted it via spark-submit --class org.apache.spark.examples.mllib.StreamingLinearRegression --master local[4] target/scala-2.10/streaminglinearregression_2.10-1.0.jar And once it started to run, I've waited for a few seconds, and then I've copied a few files to /home/emre/data/train And observed the log output on my console: 15/02/20 13:08:35 INFO FileInputDStream: Finding new files took 29 ms 15/02/20 13:08:35 INFO FileInputDStream: New files at time 1424434115000 ms: file:/home/emre/data/train/newsMessageNL14.json file:/home/emre/data/train/newsMessageNL11.json file:/home/emre/data/train/newsMessageNL10.json file:/home/emre/data/train/newsMessageNL6.json file:/home/emre/data/train/newsMessageNL8.json file:/home/emre/data/train/newsMessageNL5.json file:/home/emre/data/train/newsMessageNL1.json file:/home/emre/data/train/newsMessageNL9.json file:/home/emre/data/train/newsMessageNL2.json file:/home/emre/data/train/newsMessageNL16.json file:/home/emre/data/train/newsMessageNL20.json file:/home/emre/data/train/newsMessageNL12.json file:/home/emre/data/train/newsMessageNL4.json file:/home/emre/data/train/newsMessageNL19.json file:/home/emre/data/train/newsMessageNL7.json file:/home/emre/data/train/newsMessageNL17.json file:/home/emre/data/train/newsMessageNL18.json file:/home/emre/data/train/newsMessageNL3.json file:/home/emre/data/train/newsMessageNL13.json file:/home/emre/data/train/newsMessageNL15.json 15/02/20 13:08:35 INFO MemoryStore: ensureFreeSpace(214074) called with curMem=0, maxMem=278019440 The contents of JSON files of course don't make sense in this context (building a linear regression model), I've used them only to check whether the system detects new files, and as can be seen above, it does. You can start from the source code I've shared, which is detecting new files, and continue to build your particular streaming linear regression application. -- Emre Sevinç http://www.bigindustries.be On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote: Hi I tried to run Streaming Linear Regression in my local. val trainingData = ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse) textFileStream is not seeing the new files. I search on the Internet, and I saw that somebody has same issue but no solution is found for that. Is there any opinion for this ? Is there any body who can achieve the running streaming linear regression ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and
Hello, We have a Spark Streaming application that watches an input directory, and as files are copied there the application reads them and sends the contents to a RESTful web service, receives a response and write some contents to an output directory. When testing the application by copying a few thousand files at once to its input directory, we have realized that after having processed about 3800 files, it creates messages as the following in the log file: 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) and then the Spark Streaming application dies. What might be the potential causes to check for such errors? Below you can see last few lines before it dies: 15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 12894 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called with curMem=107884847, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0 stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block broadcast_12894_piece0 15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast variable 12894 took 460 ms 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=107905825, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored as values in memory (estimated size 339.2 KB, free 427.0 MB) 15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called with curMem=108253188, maxMem=556038881 15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as bytes in memory (estimated size 1079.0 B, free 427.0 MB) 15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block rdd_30466_35 15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=108254267, maxMem=556038881 15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as bytes in memory (estimated size 5.0 B, free 427.0 MB) 15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block rdd_30467_35 15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage 351.0 (TID 12229). 2353 bytes result sent to driver 15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0 of size 4151 dropped from memory (free 447788760) 15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block broadcast_17935_piece0 15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935 15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size 9960 dropped from memory (free 447798720) 15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 10:24:01 WARN util.AkkaUtils: Error sending message in 3 attempts
Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and
On Thu, Feb 19, 2015 at 12:27 PM, Tathagata Das t...@databricks.com wrote: What version of Spark are you using? TD Spark version is 1.2.0 (running on Cloudera CDH 5.3.0) -- Emre Sevinç
Re: Spark Streaming output cannot be used as input?
Hello Jose, We've hit the same issue a couple of months ago. It is possible to write directly to files instead of creating directories, but it is not straightforward, and I haven't seen any clear demonstration in books, tutorials, etc. We do something like: SparkConf sparkConf = new SparkConf().setAppName(appName); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(batchInterval)); JavaDStreamString stream = MyModuleApp.initializeJob(ssc); MyModuleApp.process(stream); And then in the process method: @Override public void process(JavaDStreamString inStream) { JavaDStreamString json = inStream.map(new MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); forceOutput(json); } This, in turn, calls the following (I've removed the irrelevant lines to focus on writing): public class MyModuleWorker implements FunctionString,String { public String call(String json) { // process the data and then write it writeJSON(json, validatedJSONoutputDir_); }} And the writeJSON method is: public static final void writeJSON(String json, String jsonDirPath) throws IOException {String jsonFileName = jsonDirPath + / + UUID.randomUUID().toString() + .json.tmp;URI uri = URI.create(jsonFileName);Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(uri, conf); FSDataOutputStream out = fileSystem.create(new Path(uri)); out.write(json.getBytes(StandardCharsets.UTF_8));out.close(); fileSystem.rename(new Path(uri),new Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() + .json))); } Using a similar technique you might be able to achieve your objective. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez jfernan...@sdl.com wrote: Hello folks, Our intended use case is: - Spark Streaming app #1 reads from RabbitMQ and output to HDFS - Spark Streaming app #2 reads #1’s output and stores the data into Elasticsearch The idea behind this architecture is that if Elasticsearch is down due to an upgrade or system error we don’t have to stop reading messages from the queue. We could also scale each process separately as needed. After a few hours research my understanding is that Spark Streaming outputs files in a *directory* for which you provide the prefix and suffix. This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting otherwise: /** * Save each RDD in this DStream as a Sequence file of serialized objects. * The file name at each batch interval is generated based on `prefix` and * `suffix`: prefix-TIME_IN_MS.suffix. */ Spark Streaming can monitor an HDFS directory for files but subfolders are not supported. So as far as I can tell, it is not possible to use Spark Streaming output as input for a different Spark Streaming app without somehow performing a separate operation in the middle. Am I missing something obvious? I’ve read some suggestions like using Hadoop to merge the directories (whose names I don’t see how you would know) and to reduce the partitions to 1 (which wouldn’t help). Any other suggestions? What is the expected pattern a developer would follow that would make Spark Streaming’s output format usable? www.sdl.com http://www.sdl.com/?utm_source=Emailutm_medium=Email%2BSignatureutm_campaign=SDL%2BStandard%2BEmail%2BSignature *SDL PLC confidential, all rights reserved.* If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us. SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK. This message has been scanned for malware by Websense. www.websense.com -- Emre Sevinc
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hello Imran, (a) I know that all 20 files are processed when I use foreachRDD, because I can see the processed files in the output directory. (My application logic writes them to an output directory after they are processed, *but* that writing operation does not happen in foreachRDD, below you can see the URL that includes my code and clarifies this). (b) I know only 16 files are processed because in the output directory I see only 16 files processed. I wait for minutes and minutes and no more files appear in the output directory. When I see only 16 files are processed and Spark Streaming went to the mode of idly watching the input directory, and then if I copy a few more files, they are also processed. (c) Sure, you can see part of my code in the following gist: https://gist.github.com/emres/0fb6de128baea099e741 It might seem a little convoluted at first, because my application is divided into two classes, a Driver class (setting up things and initializing them), and a Worker class (that implements the core functionality). I've also put the relevant methods from the my utility classes for completeness. I am as perplexed as you are as to why forcing the output via foreachRDD ended up in different behaviour compared to simply using print() method. Kind regards, Emre On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com wrote: Hi Emre, there shouldn't be any difference in which files get processed w/ print() vs. foreachRDD(). In fact, if you look at the definition of print(), it is just calling foreachRDD() underneath. So there is something else going on here. We need a little more information to figure out exactly what is going on. (I think Sean was getting at the same thing ...) (a) how do you know that when you use foreachRDD, all 20 files get processed? (b) How do you know that only 16 files get processed when you print()? Do you know the other files are being skipped, or maybe they are just stuck somewhere? eg., suppose you start w/ 20 files, and you see 16 get processed ... what happens after you add a few more files to the directory? Are they processed immediately, or are they never processed either? (c) Can you share any more code of what you are doing to the dstreams *before* the print() / foreachRDD()? That might give us more details about what the difference is. I can't see how .count.println() would be different than just println(), but maybe I am missing something also. Imran On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it happened. Print should print some results to the console but on different machines, so may not be a reliable way to see what happened. Yes I understand your real process uses foreachRDD and that's what you should use. It sounds like that works. But you must always have been using that right? What do you mean that you changed to use it? Basically I'm not clear on what the real code does and what about the output of that code tells you only 16 files were processed. On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might
Re: Re: Problem with 1 master + 2 slaves cluster
On Wed, Feb 18, 2015 at 10:23 AM, bit1...@163.com bit1...@163.com wrote: Sure, thanks Akhil. A further question : Is local file system(file:///) not supported in standalone cluster? FYI: I'm able to write to local file system (via HDFS API and using file:/// notation) when using Spark. -- Emre Sevinç http://www.bigindustries.be/
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Thanks to everyone for suggestions and explanations. Currently I've started to experiment with the following scenario, that seems to work for me: - Put the properties file on a web server so that it is centrally available - Pass it to the Spark driver program via --conf 'propertiesFile=http: //myWebServer.com/mymodule.properties' - And then load the configuration using Apache Commons Configuration: PropertiesConfiguration config = new PropertiesConfiguration(); config.load(System.getProperty(propertiesFile)); Using the method described above, I don't need to statically compile my properties file into the über JAR anymore, I can modify the file on the web server and when I submit my application via spark-submit, passing the URL of the properties file, the driver program reads the contents of that file for once, retrieves the values of the keys and continues. PS: I've opted for Apache Commons Configuration because it is already part of the many dependencies I have in my pom.xml, and I did not want to pull another library, even though I Typesafe Config library seems to be a powerful and flexible choice, too. -- Emre On Tue, Feb 17, 2015 at 6:12 PM, Charles Feduke charles.fed...@gmail.com wrote: Emre, As you are keeping the properties file external to the JAR you need to make sure to submit the properties file as an additional --files (or whatever the necessary CLI switch is) so all the executors get a copy of the file along with the JAR. If you know you are going to just put the properties file on HDFS then why don't you define a custom system setting like properties.url and pass it along: (this is for Spark shell, the only CLI string I have available at the moment:) spark-shell --jars $JAR_NAME \ --conf 'properties.url=hdfs://config/stuff.properties' \ --conf 'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties' ... then load the properties file during initialization by examining the properties.url system setting. I'd still strongly recommend Typesafe Config as it makes this a lot less painful, and I know for certain you can place your *.conf at a URL (using the -Dconfig.url=) though it probably won't work with an HDFS URL. On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com wrote: +1 for TypeSafe config Our practice is to include all spark properties under a 'spark' entry in the config file alongside job-specific configuration: A config file would look like: spark { master = cleaner.ttl = 123456 ... } job { context { src = foo action = barAction } prop1 = val1 } Then, to create our Spark context, we transparently pass the spark section to a SparkConf instance. This idiom will instantiate the context with the spark specific configuration: sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark))) And we can make use of the config object everywhere else. We use the override model of the typesafe config: reasonable defaults go in the reference.conf (within the jar). Environment-specific overrides go in the application.conf (alongside the job jar) and hacks are passed with -Dprop=value :-) -kr, Gerard. On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've decided to try spark-submit ... --conf spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties But when I try to retrieve the value of propertiesFile via System.err.println(propertiesFile : + System.getProperty(propertiesFile)); I get NULL: propertiesFile : null Interestingly, when I run spark-submit with --verbose, I see that it prints: spark.driver.extraJavaOptions - -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties I couldn't understand why I couldn't get to the value of propertiesFile by using standard System.getProperty method. (I can use new SparkConf().get(spark.driver.extraJavaOptions) and manually parse it, and retrieve the value, but I'd like to know why I cannot retrieve that value using System.getProperty method). Any ideas? If I can achieve what I've described above properly, I plan to pass a properties file that resides on HDFS, so that it will be available to my driver program wherever that program runs. -- Emre On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com wrote: I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf
[POWERED BY] Can you add Big Industries to the Powered by Spark page?
Hello, Could you please add Big Industries to the Powered by Spark page at https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ? Company Name: Big Industries URL: http://http://www.bigindustries.be/ Spark Components: Spark Streaming Use Case: Big Content Platform Summary: The Big Content Platform is a business-to-business content asset management service providing a searchable, aggregated source of live news feeds, public domain media and archives of content. The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the platform leverages public datasets like Freebase, DBpedia, Wiktionary, and Geonames to support semantic text enrichment. Kind regards, Emre Sevinç http://http://www.bigindustries.be/
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
Hello Dmitry, I had almost the same problem and solved it by using version 4.0.0 of SolrJ: dependency groupIdorg.apache.solr/groupId artifactIdsolr-solrj/artifactId version4.0.0/version /dependency In my case, I was lucky that version 4.0.0 of SolrJ had all the functionality I needed. -- Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:39 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I think I'm going to have to rebuild Spark with commons.httpclient.version set to 4.3.1 which looks to be the version chosen by Solrj, rather than the 4.2.6 that Spark's pom mentions. Might work. On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Did you try to make maven pick the latest version http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management That way solrj won't cause any issue, you can try this and check if the part of your code where you access HDFS works fine? On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Emre Sevinc
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
On Wed, Feb 18, 2015 at 4:54 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would that not collide with Spark/Hadoop's default dependency on HttpClient set to 4.2.6? If that's the case that might just solve the problem. Would Solrj 4.0.0 work with the latest Solr, 4.10.3? In my case, it worked; I mean I was trying to send some documents to the latest version of Solr server (v4.10.3), and using v4.0.0 of SolrJ worked without any problems so far. I couldn't find any other way to deal with this old httpclient dependency problem in Spark. -- Emre Sevinç http://www.bigindustries.be/
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
I've decided to try spark-submit ... --conf spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties But when I try to retrieve the value of propertiesFile via System.err.println(propertiesFile : + System.getProperty(propertiesFile)); I get NULL: propertiesFile : null Interestingly, when I run spark-submit with --verbose, I see that it prints: spark.driver.extraJavaOptions - -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties I couldn't understand why I couldn't get to the value of propertiesFile by using standard System.getProperty method. (I can use new SparkConf().get(spark.driver.extraJavaOptions) and manually parse it, and retrieve the value, but I'd like to know why I cannot retrieve that value using System.getProperty method). Any ideas? If I can achieve what I've described above properly, I plan to pass a properties file that resides on HDFS, so that it will be available to my driver program wherever that program runs. -- Emre On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com wrote: I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf' Since the properties file is packaged up with the JAR I don't have to worry about sending the file separately to all of the slave nodes. Typesafe Config is written in Java so it will work if you're not using Scala. (The Typesafe Config also has the advantage of being extremely easy to integrate with code that is using Java Properties today.) If you instead want to send the file separately from the JAR and you use the Typesafe Config library, you can specify config.file instead of .resource; though I'd point you to [3] below if you want to make your development life easier. 1. https://github.com/typesafehub/config 2. https://github.com/ceedubs/ficus 3. http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/ On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç -- Emre Sevinc
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it happened. Print should print some results to the console but on different machines, so may not be a reliable way to see what happened. Yes I understand your real process uses foreachRDD and that's what you should use. It sounds like that works. But you must always have been using that right? What do you mean that you changed to use it? Basically I'm not clear on what the real code does and what about the output of that code tells you only 16 files were processed. On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO
Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Sean, I'm trying this as an alternative to what I currently do. Currently I have my module.properties file for my module in the resources directory, and that file is put inside the über JAR file when I build my application with Maven, and then when I submit it using spark-submit, I can read that module.properties file via the traditional method: properties.load(MyModule.class.getClassLoader().getResourceAsStream(module.properties)); and everything works fine. The disadvantage is that in order to make any changes to that .properties file effective, I have to re-build my application. Therefore I'm trying to find a way to be able to send that module.properties file via spark-submit and read the values in iy, so that I will not be forced to build my application every time I want to make a change in the module.properties file. I've also checked the --files option of spark-submit, but I see that it is for sending the listed files to executors (correct me if I'm wrong), what I'm after is being able to pass dynamic properties (key/value pairs) to the Driver program of my Spark application. And I still could not find out how to do that. -- Emre On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen so...@cloudera.com wrote: Since SparkConf is only for Spark properties, I think it will in general only pay attention to and preserve spark.* properties. You could experiment with that. In general I wouldn't rely on Spark mechanisms for your configuration, and you can use any config mechanism you like to retain your own properties. On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç -- Emre Sevinc
Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408599: 2015-02-16 12:31:31 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408599 --- Time: 142408629 ms ---
Documentation error in MLlib - Clustering?
Hello, I was trying the streaming kmeans clustering example in the official documentation at: http://spark.apache.org/docs/1.2.0/mllib-clustering.html But I've got a type error when I tried to compile the code: [error] found : org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint][error] required: org.apache.spark.streaming.dstream.DStream[(?, org.apache.spark.mllib.linalg.Vector)][error] model.predictOnValues(testData).print()[error] ^[error] one error found[error] (compile:compile) Compilation failed And it seems like the solution is to use model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print () as shown in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala instead of model.predictOnValues(testData).print() as written in the documentation. I just wanted to draw the attention to this, so that one of the maintainers can fix the documentation. -- Emre Sevinç
Re: How to log using log4j to local file system inside a Spark application that runs on YARN?
Marcelo and Burak, Thank you very much for your explanations. Now I'm able to see my logs. On Wed, Feb 11, 2015 at 7:52 PM, Marcelo Vanzin van...@cloudera.com wrote: For Yarn, you need to upload your log4j.properties separately from your app's jar, because of some internal issues that are too boring to explain here. :-) Basically: spark-submit --master yarn --files log4j.properties blah blah blah Having to keep it outside your app jar is sub-optimal, and I think there's a bug filed to fix this, but so far no one has really spent time looking at it. On Wed, Feb 11, 2015 at 4:29 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm building an Apache Spark Streaming application and cannot make it log to a file on the local filesystem when running it on YARN. How can achieve this? I've set log4.properties file so that it can successfully write to a log file in /tmp directory on the local file system (shown below partially): log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=/tmp/application.log log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n When I run my Spark application locally by using the following command: spark-submit --class myModule.myClass --master local[2] --deploy-mode client myApp.jar It runs fine and I can see that log messages are written to /tmp/application.log on my local file system. But when I run the same application via YARN, e.g. spark-submit --class myModule.myClass --master yarn-client --name myModule --total-executor-cores 1 --executor-memory 1g myApp.jar or spark-submit --class myModule.myClass --master yarn-cluster --name myModule --total-executor-cores 1 --executor-memory 1g myApp.jar I cannot see any /tmp/application.log on the local file system of the machine that runs YARN. What am I missing? -- Emre Sevinç -- Marcelo -- Emre Sevinc
Re: Get filename in Spark Streaming
Hello, Did you check the following? http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html -- Emre Sevinç On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote: Hi All, We have filename with timestamp say ABC_1421893256000.txt and the timestamp needs to be extracted from file name for further processing.Is there a way to get input file name picked up by spark streaming job? Thanks in advance Subacini -- Emre Sevinc
Re: How to define a file filter for file name patterns in Apache Spark Streaming in Java?
Hello Akhil, Thank you for taking your time for a detailed answer. I managed to solve it in a very similar manner. Kind regards, Emre Sevinç On Mon, Feb 2, 2015 at 8:22 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Emre, This is how you do that in scala: val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](/home/akhld/sigmoid, (t: Path) = true, true) In java you can do something like: jssc.ssc().LongWritable, Text, SequenceFileInputFormatfileStream(/home/akhld/sigmoid, new AbstractFunction1Path, Object() { @Override public Boolean apply(Path input) { //file filtering logic here return true; } }, true, ClassTag$.MODULE$.apply(LongWritable.class), ClassTag$.MODULE$.apply(Text.class), ClassTag$.MODULE$.apply(SequenceFileInputFormat.class)); Thanks Best Regards On Mon, Feb 2, 2015 at 6:34 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter for file names when creating an InputDStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html by invoking the fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method. My code is working perfectly fine when I don't use a file filter, e.g. by invoking the other fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method (described here https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 ). According to the documentation of *fileStream* method, I can pass it scala.Function1org.apache.hadoop.fs.Path,Object filter But so far, I could not create a fileFilter. My initial attempts have been 1- Tried to implement it as: Function1Path, Object fileFilter = new Function1Path, Object() { @Override public Object apply(Path v1) { return true; } @Override public A Function1A, Object compose(Function1A, Path g) { return Function1$class.compose(this, g); } @Override public A Function1Path, A andThen(Function1Object, A g) { return Function1$class.andThen(this, g); } }; But apparently my implementation of andThen is wrong, and I couldn't understand how I should implement it. It complains that the anonymous function: is not abstract and does not override abstract method AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in scala.Function1 2- Tried to implement it as: Function1Path, Object fileFilter = new AbstractFunction1Path, Object() { @Override public Object apply(Path v1) { return true; } }; This one compiles, but when I run it I get an exception: 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1 java.io.NotSerializableException: myModule$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke
How to define a file filter for file name patterns in Apache Spark Streaming in Java?
Hello, I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter for file names when creating an InputDStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html by invoking the fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method. My code is working perfectly fine when I don't use a file filter, e.g. by invoking the other fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method (described here https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 ). According to the documentation of *fileStream* method, I can pass it scala.Function1org.apache.hadoop.fs.Path,Object filter But so far, I could not create a fileFilter. My initial attempts have been 1- Tried to implement it as: Function1Path, Object fileFilter = new Function1Path, Object() { @Override public Object apply(Path v1) { return true; } @Override public A Function1A, Object compose(Function1A, Path g) { return Function1$class.compose(this, g); } @Override public A Function1Path, A andThen(Function1Object, A g) { return Function1$class.andThen(this, g); } }; But apparently my implementation of andThen is wrong, and I couldn't understand how I should implement it. It complains that the anonymous function: is not abstract and does not override abstract method AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in scala.Function1 2- Tried to implement it as: Function1Path, Object fileFilter = new AbstractFunction1Path, Object() { @Override public Object apply(Path v1) { return true; } }; This one compiles, but when I run it I get an exception: 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1 java.io.NotSerializableException: myModule$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184) at
Re: Spark streaming - tracking/deleting processed files
You can utilize the following method: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 It has a parameter: newFilesOnly - Should process only new files and ignore existing files in the directory And it works as expected. -- Emre Sevinç On Fri, Jan 30, 2015 at 7:07 PM, ganterm gant...@gmail.com wrote: We are running a Spark streaming job that retrieves files from a directory (using textFileStream). One concern we are having is the case where the job is down but files are still being added to the directory. Once the job starts up again, those files are not being picked up (since they are not new or changed while the job is running) but we would like them to be processed. Is there a solution for that? Is there a way to keep track what files have been processed and can we force older files to be picked up? Is there a way to delete the processed files? Thanks! Markus -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con
Charles, Thank you very much for another suggestion. Unfortunately I couldn't make it work that way either. So I downgraded my SolrJ library from 4.10.3 to 4.0.0 [1]. Maybe using Relocating Classes [2] feature of Maven could handle this issue, but I did not want to complicate my pom.xml further, at least for now. 1- http://mvnrepository.com/artifact/org.apache.solr/solr-solrj/4.0.0 2- https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html -- Emre On Wed, Jan 28, 2015 at 5:40 PM, Charles Feduke charles.fed...@gmail.com wrote: Yeah it sounds like your original exclusion of commons-httpclient from hadoop-* was correct, but its still coming in from somewhere. Can you try something like this?: dependency artifactIdcommons-http/artifactId groupIdhttpclient/groupId scopeprovided/scope /dependency ref: http://stackoverflow.com/questions/4716310/is-there-a-way-to-exclude-a-maven-dependency-globally (I don't know if a provided dependency will work without a specific version number so I'm just making a guess here.) On Wed Jan 28 2015 at 11:24:02 AM Emre Sevinc emre.sev...@gmail.com wrote: When I examine the dependencies again, I see that SolrJ library is using v. 4.3.1 of org.apache.httpcomponents:httpclient [INFO] +- org.apache.solr:solr-solrj:jar:4.10.3:compile [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile == [INFO] | +- org.apache.httpcomponents:httpcore:jar:4.3:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile [INFO] | +- org.codehaus.woodstox:wstx-asl:jar:3.2.7:compile [INFO] | \- org.noggit:noggit:jar:0.5:compile But hadoop-common 2.4.0 is using v. 3.1.1 of commons-httpclient:commons-httpclient : +- org.apache.hadoop:hadoop-common:jar:2.4.0:provided [INFO] | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | +- org.apache.commons:commons-math3:jar:3.1.1:provided [INFO] | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | +- commons-httpclient:commons-httpclient:jar:3.1:provided === [INFO] | +- commons-codec:commons-codec:jar:1.4:compile So my reasoning was: I have to exclude v. 3.1.1 of commons-httpclient:commons-httpclient and force it to use httpclient v. 4.3.1 that SolrJ declares as a dependency. But apparently somehow it does not work, I mean I have also tried your latest suggestion (changed the 'exclusion' to org.apache.httpcomponents and httpclient), still getting the same exception: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) Maybe it is about Hadoop 2.4.0, but I think this is what is included in the binary download of Spark. I've also tried it with Spark 1.2.0 binary (pre-built for Hadoop 2.4 and later). Or maybe I'm totally wrong, and the problem / fix is something completely different? -- Emre On Wed, Jan 28, 2015 at 4:58 PM, Charles Feduke charles.fed...@gmail.com wrote: It looks like you're shading in the Apache HTTP commons library and its a different version than what is expected. (Maybe 4.6.x based on the Javadoc.) I see you are attempting to exclude commons-httpclient by using: exclusion groupIdcommons-httpclient/groupId artifactId*/artifactId /exclusion in your pom. However, what I think you really want is: exclusion groupIdorg.apache.httpcomponents/groupId artifactIdhttpclient/artifactId /exclusion The last time the groupId was commons-httpclient was Aug 2007 as version 3.1 (search.maven.com). I hope none of your dependencies rely on that particular version. SchemeRegistryFactory was introduced in version 4.3.1 of httpcomponents so even if by chance one of them did rely on commons-httpclient there wouldn't be a class conflict. On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com wrote: This is what I get: ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch emeRegistryFactory.class (probably because I'm using a self-contained JAR). In other words, I'm still stuck. -- Emre On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke
Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con
This is what I get: ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/SchemeRegistryFactory.class (probably because I'm using a self-contained JAR). In other words, I'm still stuck. -- Emre On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke charles.fed...@gmail.com wrote: I deal with problems like this so often across Java applications with large dependency trees. Add the shell function at the following link to your shell on the machine where your Spark Streaming is installed: https://gist.github.com/cfeduke/fe63b12ab07f87e76b38 Then run in the directory where your JAR files are: find-java-class SchemeRegistryFactory (I know you said HttpClient but the error seems to be an overload or method of SchemeRegistryFactory is missing from the class that is loaded by the class loader. The class loader loads the first class it finds that match the package/class name coordinates.) You'll then be able to zero in on the JAR that is bringing in an older version of that class. Once you've done that you can exclude that JAR's older dependency from in in your pom. If you find out that the newer version is incompatible you'll have to perform some magic with the Maven shade plugin. On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when using *HttpSolrServer* from within Spark Streaming: 15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) Normally, when I use my utility class that uses SolrJ to connect to a Solr server and run it by itself (running it stand-alone without Spark), everything works as expected. But when I invoke that utility class inside a Spark Streaming application, I get the exception above as soon as it is trying to establish a connection to the Solr server. My preliminary Internet search led me to believe that some Spark or Hadoop components bring an older version of *httpclient*, so I've tried to exclude them in my pom.xml. But I still get the same exception. Any ideas why? Or how can I fix it? When I analyze my pom.xml dependencies, I get: $ mvn dependency:tree -Ddetail=true | grep http [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile [INFO] | +- org.apache.httpcomponents:httpcore:jar:4.3:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile The whole dependency tree is: $ mvn dependency:tree -Ddetail=true [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building bigcontent 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent --- [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided [INFO] | +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided [INFO] | | +- org.apache.curator:curator-recipes:jar:2.4.0:provided [INFO] | | | \- org.apache.curator:curator-framework:jar:2.4.0:provided [INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:provided [INFO] | | +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:provided [INFO] | | | +- org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:provided [INFO] | | | +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:provided [INFO] | | | | +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:provided [INFO] | | | | \- org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:provided [INFO] | | | \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:provided [INFO] | | | \- org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:provided [INFO] | | |\- org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:provided [INFO
Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/sc
Hello, I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when using *HttpSolrServer* from within Spark Streaming: 15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) Normally, when I use my utility class that uses SolrJ to connect to a Solr server and run it by itself (running it stand-alone without Spark), everything works as expected. But when I invoke that utility class inside a Spark Streaming application, I get the exception above as soon as it is trying to establish a connection to the Solr server. My preliminary Internet search led me to believe that some Spark or Hadoop components bring an older version of *httpclient*, so I've tried to exclude them in my pom.xml. But I still get the same exception. Any ideas why? Or how can I fix it? When I analyze my pom.xml dependencies, I get: $ mvn dependency:tree -Ddetail=true | grep http [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile [INFO] | +- org.apache.httpcomponents:httpcore:jar:4.3:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile The whole dependency tree is: $ mvn dependency:tree -Ddetail=true [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building bigcontent 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent --- [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided [INFO] | +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided [INFO] | | +- org.apache.curator:curator-recipes:jar:2.4.0:provided [INFO] | | | \- org.apache.curator:curator-framework:jar:2.4.0:provided [INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:provided [INFO] | | +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:provided [INFO] | | | +- org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:provided [INFO] | | | +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:provided [INFO] | | | | +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:provided [INFO] | | | | \- org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:provided [INFO] | | | \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:provided [INFO] | | | \- org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:provided [INFO] | | |\- org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:provided [INFO] | | +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:provided [INFO] | | +- org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:provided [INFO] | | +- org.apache.commons:commons-lang3:jar:3.3.2:provided [INFO] | | +- org.slf4j:jul-to-slf4j:jar:1.7.5:provided [INFO] | | +- org.slf4j:jcl-over-slf4j:jar:1.7.5:provided [INFO] | | +- com.ning:compress-lzf:jar:1.0.0:provided [INFO] | | +- net.jpountz.lz4:lz4:jar:1.2.0:provided [INFO] | | +- com.twitter:chill_2.10:jar:0.3.6:provided [INFO] | | | \- com.esotericsoftware.kryo:kryo:jar:2.21:provided [INFO] | | | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:provided [INFO] | | | +- com.esotericsoftware.minlog:minlog:jar:1.2:provided [INFO] | | | \- org.objenesis:objenesis:jar:1.2:provided [INFO] | | +- com.twitter:chill-java:jar:0.3.6:provided [INFO] | | +- org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:provided [INFO] | | | +- org.spark-project.akka:akka-actor_2.10:jar:2.2.3-shaded-protobuf:provided [INFO] | | | | \- com.typesafe:config:jar:1.0.2:provided [INFO] | | | +- org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:provided [INFO] | | | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:provided [INFO] | | +- org.spark-project.akka:akka-slf4j_2.10:jar:2.2.3-shaded-protobuf:provided [INFO] | | +-
Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con
When I examine the dependencies again, I see that SolrJ library is using v. 4.3.1 of org.apache.httpcomponents:httpclient [INFO] +- org.apache.solr:solr-solrj:jar:4.10.3:compile [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile == [INFO] | +- org.apache.httpcomponents:httpcore:jar:4.3:compile [INFO] | +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile [INFO] | +- org.codehaus.woodstox:wstx-asl:jar:3.2.7:compile [INFO] | \- org.noggit:noggit:jar:0.5:compile But hadoop-common 2.4.0 is using v. 3.1.1 of commons-httpclient:commons-httpclient : +- org.apache.hadoop:hadoop-common:jar:2.4.0:provided [INFO] | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | +- org.apache.commons:commons-math3:jar:3.1.1:provided [INFO] | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | +- commons-httpclient:commons-httpclient:jar:3.1:provided === [INFO] | +- commons-codec:commons-codec:jar:1.4:compile So my reasoning was: I have to exclude v. 3.1.1 of commons-httpclient:commons-httpclient and force it to use httpclient v. 4.3.1 that SolrJ declares as a dependency. But apparently somehow it does not work, I mean I have also tried your latest suggestion (changed the 'exclusion' to org.apache.httpcomponents and httpclient), still getting the same exception: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) Maybe it is about Hadoop 2.4.0, but I think this is what is included in the binary download of Spark. I've also tried it with Spark 1.2.0 binary (pre-built for Hadoop 2.4 and later). Or maybe I'm totally wrong, and the problem / fix is something completely different? -- Emre On Wed, Jan 28, 2015 at 4:58 PM, Charles Feduke charles.fed...@gmail.com wrote: It looks like you're shading in the Apache HTTP commons library and its a different version than what is expected. (Maybe 4.6.x based on the Javadoc.) I see you are attempting to exclude commons-httpclient by using: exclusion groupIdcommons-httpclient/groupId artifactId*/artifactId /exclusion in your pom. However, what I think you really want is: exclusion groupIdorg.apache.httpcomponents/groupId artifactIdhttpclient/artifactId /exclusion The last time the groupId was commons-httpclient was Aug 2007 as version 3.1 (search.maven.com). I hope none of your dependencies rely on that particular version. SchemeRegistryFactory was introduced in version 4.3.1 of httpcomponents so even if by chance one of them did rely on commons-httpclient there wouldn't be a class conflict. On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com wrote: This is what I get: ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch emeRegistryFactory.class (probably because I'm using a self-contained JAR). In other words, I'm still stuck. -- Emre On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke charles.fed...@gmail.com wrote: I deal with problems like this so often across Java applications with large dependency trees. Add the shell function at the following link to your shell on the machine where your Spark Streaming is installed: https://gist.github.com/cfeduke/fe63b12ab07f87e76b38 Then run in the directory where your JAR files are: find-java-class SchemeRegistryFactory (I know you said HttpClient but the error seems to be an overload or method of SchemeRegistryFactory is missing from the class that is loaded by the class loader. The class loader loads the first class it finds that match the package/class name coordinates.) You'll then be able to zero in on the JAR that is bringing in an older version of that class. Once you've done that you can exclude that JAR's older dependency from in in your pom. If you find out that the newer version is incompatible you'll have to perform some magic with the Maven shade plugin. On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when using *HttpSolrServer* from within Spark Streaming: 15/01/28 13:42:52 ERROR Executor
Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?
Hello, I have a piece of code that runs inside Spark Streaming and tries to get some data from a RESTful web service (that runs locally on my machine). The code snippet in question is: Client client = ClientBuilder.newClient(); WebTarget target = client.target(http://localhost:/rest;); target = target.path(annotate) .queryParam(text, UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission)) .queryParam(confidence, 0.3); logger.warn(!!! DEBUG !!! target: {}, target.getUri().toString()); String response = target.request().accept(MediaType.APPLICATION_JSON_TYPE).get(String.class); logger.warn(!!! DEBUG !!! Spotlight response: {}, response); When run inside a unit test as follows: mvn clean test -Dtest=SpotlightTest#testCountWords it contacts the RESTful web service and retrieves some data as expected. But when the same code is run as part of the application that is submitted to Spark, using spark-submit script I receive the following error: java.lang.NoSuchMethodError: javax.ws.rs.core.MultivaluedMap.addAll(Ljava/lang/Object;[Ljava/lang/Object;)V I'm using Spark 1.1.0 and for consuming the web service I'm using Jersey in my project's pom.xml: dependency groupIdorg.glassfish.jersey.containers/groupId artifactIdjersey-container-servlet-core/artifactId version2.14/version /dependency So I suspect that when the application is submitted to Spark, somehow there's a different JAR in the environment that uses a different version of Jersey / javax.ws.rs.* Does anybody know which version of Jersey / javax.ws.rs.* is used in the Spark environment, or how to solve this conflict? -- Emre Sevinç https://be.linkedin.com/in/emresevinc/
Re: Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?
On Wed, Dec 24, 2014 at 1:46 PM, Sean Owen so...@cloudera.com wrote: I'd take a look with 'mvn dependency:tree' on your own code first. Maybe you are including JavaEE 6 for example? For reference, my complete pom.xml looks like: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdbigcontent/groupId artifactIdbigcontent/artifactId version1.0-SNAPSHOT/version packagingjar/packaging namebigcontent/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties build plugins plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-shade-plugin/artifactId version2.3/version configuration !-- put your configurations here -- /configuration executions execution phasepackage/phase goals goalshade/goal /goals /execution /executions /plugin plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version3.2/version configuration source1.7/source target1.7/target /configuration /plugin /plugins /build dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.1.1/version scopeprovided/scope /dependency dependency groupIdorg.glassfish.jersey.containers/groupId artifactIdjersey-container-servlet-core/artifactId version2.14/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.4.0/version /dependency dependency groupIdcom.google.guava/groupId artifactIdguava/artifactId version16.0/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-mapreduce-client-core/artifactId version2.4.0/version /dependency dependency groupIdjson-mapreduce/groupId artifactIdjson-mapreduce/artifactId version1.0-SNAPSHOT/version exclusions exclusion groupIdjavax.servlet/groupId artifactId*/artifactId /exclusion exclusion groupIdcommons-io/groupId artifactId*/artifactId /exclusion exclusion groupIdcommons-lang/groupId artifactId*/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.avro/groupId artifactIdavro-mapred/artifactId version1.7.7/version exclusions exclusion groupIdjavax.servlet/groupId artifactId*/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId /exclusion /exclusions /dependency dependency groupIdjunit/groupId artifactIdjunit/artifactId version4.11/version scopetest/scope /dependency dependency groupIdorg.apache.avro/groupId artifactIdavro/artifactId version1.7.7/version exclusions exclusion groupIdjavax.servlet/groupId artifactId*/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId version2.4.0/version scopeprovided/scope exclusions exclusion groupIdjavax.servlet/groupId artifactId*/artifactId /exclusion exclusion groupIdcom.google.guava/groupId artifactId*/artifactId /exclusion /exclusions /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version1.7.7/version /dependency /dependencies /project And 'mvn dependency:tree' produces the following output: [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building bigcontent 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent --- [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided [INFO] | +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided [INFO] | | +- org.apache.curator:curator-recipes:jar:2.4.0:provided [INFO] | | | \- org.apache.curator:curator-framework:jar:2.4.0:provided [INFO] | | | \-
Re: Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?
It seems like YARN depends an older version of Jersey, that is 1.9: https://github.com/apache/spark/blob/master/yarn/pom.xml When I've modified my dependencies to have only: dependency groupIdcom.sun.jersey/groupId artifactIdjersey-core/artifactId version1.9.1/version /dependency And then modified the code to use the old Jersey API: Client c = Client.create(); WebResource r = c.resource(http://localhost:/rest;) .path(annotate) .queryParam(text, UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission)) .queryParam(confidence, 0.3); logger.warn(!!! DEBUG !!! target: {}, r.getURI()); String response = r.accept(MediaType.APPLICATION_JSON_TYPE) //.header() .get(String.class); logger.warn(!!! DEBUG !!! Spotlight response: {}, response); It seems to work when I use spark-submit to submit the application that includes this code. Funny thing is, now my relevant unit test does not run, complaining about not having enough memory: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0xc490, 25165824, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 25165824 bytes for committing reserved memory. -- Emre On Wed, Dec 24, 2014 at 1:46 PM, Sean Owen so...@cloudera.com wrote: Your guess is right, that there are two incompatible versions of Jersey (or really, JAX-RS) in your runtime. Spark doesn't use Jersey, but its transitive dependencies may, or your transitive dependencies may. I don't see Jersey in Spark's dependency tree except from HBase tests, which in turn only appear in examples, so that's unlikely to be it. I'd take a look with 'mvn dependency:tree' on your own code first. Maybe you are including JavaEE 6 for example? On Wed, Dec 24, 2014 at 12:02 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a piece of code that runs inside Spark Streaming and tries to get some data from a RESTful web service (that runs locally on my machine). The code snippet in question is: Client client = ClientBuilder.newClient(); WebTarget target = client.target(http://localhost:/rest;); target = target.path(annotate) .queryParam(text, UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission)) .queryParam(confidence, 0.3); logger.warn(!!! DEBUG !!! target: {}, target.getUri().toString()); String response = target.request().accept(MediaType.APPLICATION_JSON_TYPE).get(String.class); logger.warn(!!! DEBUG !!! Spotlight response: {}, response); When run inside a unit test as follows: mvn clean test -Dtest=SpotlightTest#testCountWords it contacts the RESTful web service and retrieves some data as expected. But when the same code is run as part of the application that is submitted to Spark, using spark-submit script I receive the following error: java.lang.NoSuchMethodError: javax.ws.rs.core.MultivaluedMap.addAll(Ljava/lang/Object;[Ljava/lang/Object;)V I'm using Spark 1.1.0 and for consuming the web service I'm using Jersey in my project's pom.xml: dependency groupIdorg.glassfish.jersey.containers/groupId artifactIdjersey-container-servlet-core/artifactId version2.14/version /dependency So I suspect that when the application is submitted to Spark, somehow there's a different JAR in the environment that uses a different version of Jersey / javax.ws.rs.* Does anybody know which version of Jersey / javax.ws.rs.* is used in the Spark environment, or how to solve this conflict? -- Emre Sevinç https://be.linkedin.com/in/emresevinc/ -- Emre Sevinc
Re: Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?
Sean, Thanks a lot for the important information, especially userClassPathFirst. Cheers, Emre On Wed, Dec 24, 2014 at 3:38 PM, Sean Owen so...@cloudera.com wrote: That could well be it -- oops, I forgot to run with the YARN profile and so didn't see the YARN dependencies. Try the userClassPathFirst option to try to make your app's copy take precedence. The second error is really a JVM bug, but, is from having too little memory available for the unit tests. http://spark.apache.org/docs/latest/building-spark.html#setting-up-mavens-memory-usage On Wed, Dec 24, 2014 at 1:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: It seems like YARN depends an older version of Jersey, that is 1.9: https://github.com/apache/spark/blob/master/yarn/pom.xml When I've modified my dependencies to have only: dependency groupIdcom.sun.jersey/groupId artifactIdjersey-core/artifactId version1.9.1/version /dependency And then modified the code to use the old Jersey API: Client c = Client.create(); WebResource r = c.resource(http://localhost:/rest;) .path(annotate) .queryParam(text, UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission)) .queryParam(confidence, 0.3); logger.warn(!!! DEBUG !!! target: {}, r.getURI()); String response = r.accept(MediaType.APPLICATION_JSON_TYPE) //.header() .get(String.class); logger.warn(!!! DEBUG !!! Spotlight response: {}, response); It seems to work when I use spark-submit to submit the application that includes this code. Funny thing is, now my relevant unit test does not run, complaining about not having enough memory: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0xc490, 25165824, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 25165824 bytes for committing reserved memory. -- Emre On Wed, Dec 24, 2014 at 1:46 PM, Sean Owen so...@cloudera.com wrote: Your guess is right, that there are two incompatible versions of Jersey (or really, JAX-RS) in your runtime. Spark doesn't use Jersey, but its transitive dependencies may, or your transitive dependencies may. I don't see Jersey in Spark's dependency tree except from HBase tests, which in turn only appear in examples, so that's unlikely to be it. I'd take a look with 'mvn dependency:tree' on your own code first. Maybe you are including JavaEE 6 for example? On Wed, Dec 24, 2014 at 12:02 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a piece of code that runs inside Spark Streaming and tries to get some data from a RESTful web service (that runs locally on my machine). The code snippet in question is: Client client = ClientBuilder.newClient(); WebTarget target = client.target(http://localhost:/rest;); target = target.path(annotate) .queryParam(text, UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission)) .queryParam(confidence, 0.3); logger.warn(!!! DEBUG !!! target: {}, target.getUri().toString()); String response = target.request().accept(MediaType.APPLICATION_JSON_TYPE).get(String.class); logger.warn(!!! DEBUG !!! Spotlight response: {}, response); When run inside a unit test as follows: mvn clean test -Dtest=SpotlightTest#testCountWords it contacts the RESTful web service and retrieves some data as expected. But when the same code is run as part of the application that is submitted to Spark, using spark-submit script I receive the following error: java.lang.NoSuchMethodError: javax.ws.rs.core.MultivaluedMap.addAll(Ljava/lang/Object;[Ljava/lang/Object;)V I'm using Spark 1.1.0 and for consuming the web service I'm using Jersey in my project's pom.xml: dependency groupIdorg.glassfish.jersey.containers/groupId artifactIdjersey-container-servlet-core/artifactId version2.14/version /dependency So I suspect that when the application is submitted to Spark, somehow there's a different JAR in the environment that uses a different version of Jersey / javax.ws.rs.* Does anybody know which version of Jersey / javax.ws.rs.* is used in the Spark environment, or how to solve this conflict? -- Emre Sevinç https://be.linkedin.com/in/emresevinc/ -- Emre Sevinc -- Emre Sevinc
Re: Unit testing and Spark Streaming
On Fri, Dec 12, 2014 at 2:17 PM, Eric Loots eric.lo...@gmail.com wrote: How can the log level in test mode be reduced (or extended when needed) ? Hello Eric, The following might be helpful for reducing the log messages during unit testing: http://stackoverflow.com/a/2736/236007 -- Emre Sevinç https://be.linkedin.com/in/emresevinc
How can I make Spark Streaming count the words in a file in a unit test?
Hello, I've successfully built a very simple Spark Streaming application in Java that is based on the HdfsCount example in Scala at https://github.com/apache/spark/blob/branch-1.1/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala . When I submit this application to my local Spark, it waits for a file to be written to a given directory, and when I create that file it successfully prints the number of words. I terminate the application by pressing Ctrl+C. Now I've tried to create a very basic unit test for this functionality, but in the test I was not able to print the same information, that is the number of words. What am I missing? Below is the unit test file, and after that I've also included the code snippet that shows the countWords method: = StarterAppTest.java = import com.google.common.io.Files; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.*; import java.io.*; public class StarterAppTest { JavaStreamingContext ssc; File tempDir; @Before public void setUp() { ssc = new JavaStreamingContext(local, test, new Duration(3000)); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); } @After public void tearDown() { ssc.stop(); ssc = null; } @Test public void testInitialization() { Assert.assertNotNull(ssc.sc()); } @Test public void testCountWords() { StarterApp starterApp = new StarterApp(); try { JavaDStreamString lines = ssc.textFileStream(tempDir.getAbsolutePath()); JavaPairDStreamString, Integer wordCounts = starterApp.countWords(lines); System.err.println(= Word Counts ===); wordCounts.print(); System.err.println(= Word Counts ===); ssc.start(); File tmpFile = new File(tempDir.getAbsolutePath(), tmp.txt); PrintWriter writer = new PrintWriter(tmpFile, UTF-8); writer.println(8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin); writer.close(); System.err.println(= Word Counts ===); wordCounts.print(); System.err.println(= Word Counts ===); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } Assert.assertTrue(true); } } = This test compiles and starts to run, Spark Streaming prints a lot of diagnostic messages on the console but the calls to wordCounts.print(); does not print anything, whereas in StarterApp.java itself, they do. I've also added ssc.awaitTermination(); after ssc.start() but nothing changed in that respect. After that I've also tried to create a new file in the directory that this Spark Streaming application was checking but this time it gave an error. For completeness, below is the wordCounts method: public JavaPairDStreamString, Integer countWords(JavaDStreamString lines) { JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { return new Tuple2(s, 1); } }).reduceByKey((i1, i2) - i1 + i2); return wordCounts; } Kind regards Emre Sevinç
How can I compile only the core and streaming (so that I can get test utilities of streaming)?
Hello, I'm currently developing a Spark Streaming application and trying to write my first unit test. I've used Java for this application, and I also need use Java (and JUnit) for writing unit tests. I could not find any documentation that focuses on Spark Streaming unit testing, all I could find was the Java based unit tests in Spark Streaming source code: https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2fblob%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fjava%2forg%2fapache%2fspark%2fstreaming%2fJavaAPISuite.java that depends on a Scala file: https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2fblob%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fjava%2forg%2fapache%2fspark%2fstreaming%2fJavaTestUtils.scala which, in turn, depends on the Scala test files in https://github.com/apache/spark/tree/branch-1.1/streaming/src/test/scala/org/apache/spark/streaming https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2ftree%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fscala%2forg%2fapache%2fspark%2fstreaming So I thought that I could grab the Spark source code, switch to branch-1.1 branch and then only compile 'core' and 'streaming' modules, hopefully ending up with the compiled classes (or jar files) of the Streaming test utilities, so that I can import them in my Java based Spark Streaming application. However, trying to build it via the following command line failed: mvn -pl core,streaming package You can see the full output at the end of this message. Any ideas how to progress? Full output of the build: emre@emre-ubuntu:~/code/spark$ mvn -pl core,streaming package [INFO] Scanning for projects... [INFO] [INFO] Reactor Build Order: [INFO] [INFO] Spark Project Core [INFO] Spark Project Streaming [INFO] [INFO] [INFO] Building Spark Project Core 1.1.2-SNAPSHOT [INFO] Downloading: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom Downloaded: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom (5 KB at 5.8 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar Downloaded: https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar (31 KB at 200.4 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom Downloaded: https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom (24 KB at 178.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom Downloaded: https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom (3 KB at 22.5 KB/sec) Downloading: https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom Downloaded: https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom (2 KB at 19.2 KB/sec) Downloading: https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom Downloaded: https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom (2 KB at 14.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom Downloaded: https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom (1010 B at 4.1 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom (5 KB at 42.9 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom (13 KB at 133.8 KB/sec) Downloading: https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom Downloaded: https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom (6 KB at 38.8 KB/sec) Downloading:
Re: How can I compile only the core and streaming (so that I can get test utilities of streaming)?
Hello, Specifying '-DskipTests' on commandline worked, though I can't be sure whether first running 'sbt assembly' also contributed to the solution. (I've tried 'sbt assembly' because branch-1.1's README says to use sbt). Thanks for the answer. Kind regards, Emre Sevinç