Re: Building Spark

2015-05-13 Thread Emre Sevinc
My 2 cents: If you have Java 8, you don't need any extra settings for
Maven.

--
Emre Sevinç

On Wed, May 13, 2015 at 3:02 PM, Stephen Boesch java...@gmail.com wrote:

 Hi Akhil,   Building with sbt tends to need around 3.5GB whereas maven
 requirements are much lower , around 1.7GB. So try using maven .

 For reference I have the following settings and both do compile.  sbt
 would not work with lower values.


 $echo $SBT_OPTS
 -Xmx3012m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
 $echo $MAVEN_OPTS
 -Xmx1280m -XX:MaxPermSize=384m

 2015-05-13 5:57 GMT-07:00 Heisenberg Bb hbbalg...@gmail.com:

 I tried to build Spark in my local machine Ubuntu 14.04 ( 4 GB Ram), my
 system  is getting hanged (freezed). When I monitered system processes, the
 build process is found to consume 85% of my memory. Why does it need lot of
 resources. Is there any efficient method to build Spark.

 Thanks
 Akhil





-- 
Emre Sevinc


Re: How to deal with code that runs before foreach block in Apache Spark?

2015-05-06 Thread Emre Sevinc
Imran, Gerard,

Indeed your suggestions were correct and it helped me. Thank you for your
replies.

--
Emre

On Tue, May 5, 2015 at 4:24 PM, Imran Rashid iras...@cloudera.com wrote:

 Gerard is totally correct -- to expand a little more, I think what you
 want to do is a solrInputDocumentJavaRDD.foreachPartition, instead of
 solrInputDocumentJavaRDD.foreach:


 solrInputDocumentJavaRDD.foreachPartition(
   new VoidFunctionIteratorSolrInputDocument() {
 @Override
 public void call(IteratorSolrInputDocument docItr) {
   ListSolrInputDocument docs = new ArrayListSolrInputDocument();
   for(SolrInputDocument solrInputDocument: docItr) {
 // Add the solrInputDocument to the list of SolrInputDocuments
 docs.add(solrInputDocument);
   }
   // push things to solr **from the executor, for this partition**
   // so for this make sense, you need to be sure solr can handle a
 bunch
   // of executors pushing into it simultaneously.
   addThingsToSolr(docs);
 }
 });

 On Mon, May 4, 2015 at 8:44 AM, Gerard Maas gerard.m...@gmail.com wrote:

 I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
 is a singleton, I guess that what's going on when running on a cluster is
 that the call to:

  SolrIndexerDriver.solrInputDocumentList.add(elem)

 is happening on different singleton instances of the  SolrIndexerDriver
 on different JVMs while

 SolrIndexerDriver.solrServer.commit

 is happening on the driver.

 In practical terms, the lists on the executors are being filled-in but
 they are never committed and on the driver the opposite is happening.

 -kr, Gerard

 On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 I'm trying to deal with some code that runs differently on Spark
 stand-alone mode and Spark running on a cluster. Basically, for each item
 in an RDD, I'm trying to add it to a list, and once this is done, I want to
 send this list to Solr.

 This works perfectly fine when I run the following code in stand-alone
 mode of Spark, but does not work when the same code is run on a cluster.
 When I run the same code on a cluster, it is like send to Solr part of
 the code is executed before the list to be sent to Solr is filled with
 items. I try to force the execution by solrInputDocumentJavaRDD.collect();
 after foreach, but it seems like it does not have any effect.

 // For each RDD
 solrInputDocumentJavaDStream.foreachRDD(
 new FunctionJavaRDDSolrInputDocument, Void() {
   @Override
   public Void call(JavaRDDSolrInputDocument
 solrInputDocumentJavaRDD) throws Exception {

 // For each item in a single RDD
 solrInputDocumentJavaRDD.foreach(
 new VoidFunctionSolrInputDocument() {
   @Override
   public void call(SolrInputDocument
 solrInputDocument) {

 // Add the solrInputDocument to the list of
 SolrInputDocuments

 SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
   }
 });

 // Try to force execution
 solrInputDocumentJavaRDD.collect();


 // After having finished adding every SolrInputDocument to
 the list
 // add it to the solrServer, and commit, waiting for the
 commit to be flushed
 try {

   // Seems like when run in cluster mode, the list size is
 zero,
  // therefore the following part is never executed

   if (SolrIndexerDriver.solrInputDocumentList != null
SolrIndexerDriver.solrInputDocumentList.size()
  0) {

 SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
 SolrIndexerDriver.solrServer.commit(true, true);
 SolrIndexerDriver.solrInputDocumentList.clear();
   }
 } catch (SolrServerException | IOException e) {
   e.printStackTrace();
 }


 return null;
   }
 }
 );


 What should I do, so that sending-to-Solr part executes after the list
 of SolrDocuments are added to solrInputDocumentList (and works also in
 cluster mode)?


 --
 Emre Sevinç






-- 
Emre Sevinc


How to deal with code that runs before foreach block in Apache Spark?

2015-05-04 Thread Emre Sevinc
I'm trying to deal with some code that runs differently on Spark
stand-alone mode and Spark running on a cluster. Basically, for each item
in an RDD, I'm trying to add it to a list, and once this is done, I want to
send this list to Solr.

This works perfectly fine when I run the following code in stand-alone mode
of Spark, but does not work when the same code is run on a cluster. When I
run the same code on a cluster, it is like send to Solr part of the code
is executed before the list to be sent to Solr is filled with items. I try
to force the execution by solrInputDocumentJavaRDD.collect(); after
foreach, but it seems like it does not have any effect.

// For each RDD
solrInputDocumentJavaDStream.foreachRDD(
new FunctionJavaRDDSolrInputDocument, Void() {
  @Override
  public Void call(JavaRDDSolrInputDocument
solrInputDocumentJavaRDD) throws Exception {

// For each item in a single RDD
solrInputDocumentJavaRDD.foreach(
new VoidFunctionSolrInputDocument() {
  @Override
  public void call(SolrInputDocument solrInputDocument)
{

// Add the solrInputDocument to the list of
SolrInputDocuments

SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
  }
});

// Try to force execution
solrInputDocumentJavaRDD.collect();


// After having finished adding every SolrInputDocument to the
list
// add it to the solrServer, and commit, waiting for the commit
to be flushed
try {

  // Seems like when run in cluster mode, the list size is zero,
 // therefore the following part is never executed

  if (SolrIndexerDriver.solrInputDocumentList != null
   SolrIndexerDriver.solrInputDocumentList.size() 
0) {

SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
SolrIndexerDriver.solrServer.commit(true, true);
SolrIndexerDriver.solrInputDocumentList.clear();
  }
} catch (SolrServerException | IOException e) {
  e.printStackTrace();
}


return null;
  }
}
);


What should I do, so that sending-to-Solr part executes after the list of
SolrDocuments are added to solrInputDocumentList (and works also in cluster
mode)?


-- 
Emre Sevinç


Re: Spark Unit Testing

2015-04-21 Thread Emre Sevinc
Hello James,

Did you check the following resources:

 -
https://github.com/apache/spark/tree/master/streaming/src/test/java/org/apache/spark/streaming

 -
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs

--
Emre Sevinç
http://www.bigindustries.be/


On Tue, Apr 21, 2015 at 1:26 PM, James King jakwebin...@gmail.com wrote:

 I'm trying to write some unit tests for my spark code.

 I need to pass a JavaPairDStreamString, String to my spark class.

 Is there a way to create a JavaPairDStream using Java API?

 Also is there a good resource that covers an approach (or approaches) for
 unit testing using Java.

 Regards
 jk




-- 
Emre Sevinc


Re: override log4j.properties

2015-04-09 Thread Emre Sevinc
One method: By putting your custom log4j.properties file in your /resources
directory.

As an example, please see: http://stackoverflow.com/a/2736/236007

Kind regards,

Emre Sevinç
http://www.bigindustries.be/



On Thu, Apr 9, 2015 at 2:17 PM, patcharee patcharee.thong...@uni.no wrote:

 Hello,

 How to override log4j.properties for a specific spark job?

 BR,
 Patcharee


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Emre Sevinc


Re: Query REST web service with Spark?

2015-04-01 Thread Emre Sevinc
Hello Minnow,

It is possible. You can for example use Jersey REST client to query a web
service and get its results in a Spark job. In fact, that's what we did
actually in a recent project (in a Spark Streaming application).

Kind regards,

Emre Sevinç
http://www.bigindustries.be/



On Tue, Mar 31, 2015 at 10:46 PM, Minnow Noir minnown...@gmail.com wrote:

 We have have some data on Hadoop that needs augmented with data only
 available to us via a REST service.  We're using Spark to search for, and
 correct, missing data. Even though there are a lot of records to scour for
 missing data, the total number of calls to the service is expected to be
 low, so it would be ideal to do the whole job in Spark as we scour the data.

 I don't see anything obvious in the API or on Google relating to making
 REST calls from a Spark job.  Is it possible?

 Thanks,

 Alec




-- 
Emre Sevinc


Re: log4j.properties in jar

2015-03-31 Thread Emre Sevinc
Hello Udit,

Yes, what you ask is possible. If you follow the Spark documentation and
tutorial about how to build stand-alone applications, you can see that it
is possible to build a stand-alone, über-JAR file that includes everything.

For example, if you want to suppress some messages by modifying log4j in
unit tests, you can do the following:
http://stackoverflow.com/questions/27248997/how-to-suppress-spark-logging-in-unit-tests/2736#2736

Hope this helps.

--
Emre Sevinç
http://www.bigindustries.be/


On Mon, Mar 30, 2015 at 10:24 PM, Udit Mehta ume...@groupon.com wrote:

 Hi,


 Is it possible to put the log4j.properties in the application jar such
 that the driver and the executors use this log4j file. Do I need to specify
 anything while submitting my app so that this file is used?

 Thanks,
 Udit




-- 
Emre Sevinc


Re: Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-24 Thread Emre Sevinc
Hello Sandy,

Thank you for your explanation. Then I would at least expect that to be
consistent across local, yarn-client, and yarn-cluster modes. (And not lead
to the case where it somehow works in two of them, and not for the third).

Kind regards,

Emre Sevinç
http://www.bigindustries.be/


On Tue, Mar 24, 2015 at 4:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Ah, yes, I believe this is because only properties prefixed with spark
 get passed on.  The purpose of the --conf option is to allow passing
 Spark properties to the SparkConf, not to add general key-value pairs to
 the JVM system properties.

 -Sandy

 On Tue, Mar 24, 2015 at 4:25 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello Sandy,

 Your suggestion does not work when I try it locally:

 When I pass

   --conf key=someValue

 and then try to retrieve it like:

 SparkConf sparkConf = new SparkConf();
 logger.info(* * * key ~~~ {}, sparkConf.get(key));

 I get

   Exception in thread main java.util.NoSuchElementException: key

 And I think that's expected because the key is an arbitrary one, not
 necessarily a Spark configuration element. This is why I was passing it via
 --conf and retrieving System.getProperty(key) (which worked locally and
 in yarn-client mode but not in yarn-cluster mode). I'm surprised why I
 can't use it on the cluster while I can use it while local development and
 testing.

 Kind regards,

 Emre Sevinç
 http://www.bigindustries.be/



 On Mon, Mar 23, 2015 at 6:15 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Emre,

 The --conf property is meant to work with yarn-cluster mode.
 System.getProperty(key) isn't guaranteed, but new SparkConf().get(key)
 should.  Does it not?

 -Sandy

 On Mon, Mar 23, 2015 at 8:39 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 According to Spark Documentation at
 https://spark.apache.org/docs/1.2.1/submitting-applications.html :

   --conf: Arbitrary Spark configuration property in key=value format.
 For values that contain spaces wrap “key=value” in quotes (as shown).

 And indeed, when I use that parameter, in my Spark program I can
 retrieve the value of the key by using:

 System.getProperty(key);

 This works when I test my program locally, and also in yarn-client
 mode, I can log the value of the key and see that it matches what I wrote
 in the command line, but it returns *null* when I submit the very same
 program in *yarn-cluster* mode.

 Why can't I retrieve the value of key given as --conf key=value when
 I submit my Spark application in *yarn-cluster* mode?

 Any ideas and/or workarounds?


 --
 Emre Sevinç
 http://www.bigindustries.be/





 --
 Emre Sevinc





-- 
Emre Sevinc


Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-23 Thread Emre Sevinc
Hello,

According to Spark Documentation at
https://spark.apache.org/docs/1.2.1/submitting-applications.html :

  --conf: Arbitrary Spark configuration property in key=value format. For
values that contain spaces wrap “key=value” in quotes (as shown).

And indeed, when I use that parameter, in my Spark program I can retrieve
the value of the key by using:

System.getProperty(key);

This works when I test my program locally, and also in yarn-client mode, I
can log the value of the key and see that it matches what I wrote in the
command line, but it returns *null* when I submit the very same program in
*yarn-cluster* mode.

Why can't I retrieve the value of key given as --conf key=value when I
submit my Spark application in *yarn-cluster* mode?

Any ideas and/or workarounds?


-- 
Emre Sevinç
http://www.bigindustries.be/


Re: log files of failed task

2015-03-23 Thread Emre Sevinc
Hello Sergun,

Generally you can use

   yarn application -list

to see the applicationIDs of applications and then you can see the logs
of finished applications using:

   yarn logs -applicationId applicationID

Hope this helps.

--
Emre Sevinç
http://www.bigindustries.be/



On Mon, Mar 23, 2015 at 8:23 AM, sergunok ser...@gmail.com wrote:

 Hi,

 I executed a task on Spark in YARN and it failed.
 I see just executor lost message from YARNClientScheduler, no further
 details..
 (I read ths error can be connected to spark.yarn.executor.memoryOverhead
 setting and already played with this param)

 How to go more deeply in details in log files and find exact reason? How
can
 log of failed task be examined?

 Unfortunately I haven't access to UI of Spark just can use command line.

 Thanks!

 Serg.



 --
 View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




--
Emre Sevinc


Re: Writing Spark Streaming Programs

2015-03-19 Thread Emre Sevinc
Hello James,

I've been working with Spark Streaming for the last 6 months, and I'm
coding in Java 7. Even though I haven't encountered any blocking issues
with that combination, I'd definitely pick Scala if the decision was up to
me.

I agree with Gerard and Charles on this one. If you can, go with Scala for
Spark Streaming applications.

Cheers,

Emre Sevinç
http://www.bigindustries.be/



On Thu, Mar 19, 2015 at 4:09 PM, James King jakwebin...@gmail.com wrote:

 Many thanks Gerard, this is very helpful. Cheers!

 On Thu, Mar 19, 2015 at 4:02 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Try writing this Spark Streaming idiom in Java and you'll choose Scala
 soon enough:

 dstream.foreachRDD{rdd =
  rdd.foreachPartition( partition = )
 }

 When deciding between Java and Scala for Spark, IMHO Scala has the
 upperhand. If you're concerned with readability, have a look at the Scala
 coding style recently open sourced by DataBricks:
 https://github.com/databricks/scala-style-guide  (btw, I don't agree a
 good part of it, but recognize that it can keep the most complex Scala
 constructions out of your code)



 On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com
 wrote:

 Hello All,

 I'm using Spark for streaming but I'm unclear one which implementation
 language to use Java, Scala or Python.

 I don't know anything about Python, familiar with Scala and have been
 doing Java for a long time.

 I think the above shouldn't influence my decision on which language to
 use because I believe the tool should, fit the problem.

 In terms of performance Java and Scala are comparable. However Java is
 OO and Scala is FP, no idea what Python is.

 If using Scala and not applying a consistent style of programming Scala
 code can become unreadable, but I do like the fact it seems to be possible
 to do so much work with so much less code, that's a strong selling point
 for me. Also it could be that the type of programming done in Spark is best
 implemented in Scala as FP language, not sure though.

 The question I would like your good help with is are there any other
 considerations I need to think about when deciding this? are there any
 recommendations you can make in regards to this?

 Regards
 jk











-- 
Emre Sevinc


Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Emre Sevinc
I've also tried the following:

Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set(multilinejsoninputformat.member, itemSet);

JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
factory, false);


but I still get the same exception.

Why doesn't getOrCreate ignore that Hadoop configuration part (which
normally works, e.g. when not recovering)?

--
Emre


On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I have a Spark Streaming application (that uses Spark 1.2.1) that listens
 to an input directory, and when new JSON files are copied to that directory
 processes them, and writes them to an output directory.

 It uses a 3rd party library to process the multi-line JSON files (
 https://github.com/alexholmes/json-mapreduce). You can see the relevant
 part of the streaming application at:

   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

 When I run this application locally, it works perfectly fine. But then I
 wanted to test whether it could recover from failure, e.g. if I stopped it
 right in the middle of processing some files. I started the streaming
 application, copied 100 files to the input directory, and hit Ctrl+C when
 it has alread processed about 50 files:

 ...
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 [Stage
 0:==
 (65 + 4) / 100]
 ^C

 Then I started the application again, expecting that it could recover from
 the checkpoint. For a while it started to read files again and then gave an
 exception:

 ...
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
  * * * hadoopConfiguration: itemSet
 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
 (TID 0)
 java.io.IOException: Missing configuration value for
 multilinejsoninputformat.member
 at
 com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Since in the exception it refers to a missing configuration
 multilinejsoninputformat.member, I think it is about the following line:

ssc.ssc().sc().hadoopConfiguration().set(
 multilinejsoninputformat.member, itemSet);

 And this is why I also log the value of it, and as you can see above, just
 before it gives the exception in the recovery process, it shows that 
 multilinejsoninputformat.member
 is set to itemSet. But somehow it is not found during the recovery.
 This exception happens only when it tries to recover from a previously
 interrupted run.

 I've also tried moving the above line into the createContext method, but
 still had the same exception.

 Why is that?

 And how can I work around it?

 --
 Emre Sevinç
 http://www.bigindustries.be/




-- 
Emre Sevinc


Is FileInputDStream returned by fileStream method a reliable receiver?

2015-03-04 Thread Emre Sevinc
Is FileInputDStream returned by fileStream method a reliable receiver?

In the Spark Streaming Guide it says:

  There can be two kinds of data sources based on their *reliability*.
Sources (like Kafka and Flume) allow the transferred data to be
acknowledged. If the system receiving data from these *reliable* sources
acknowledge the received data correctly, it can be ensured that no data
gets lost due to any kind of failure. This leads to two kinds of receivers.

   1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a
   reliable source that the data has been received and stored in Spark with
   replication.
   2. *Unreliable Receiver* - These are receivers for sources that do not
   support acknowledging. Even for reliable sources, one may implement an
   unreliable receiver that do not go into the complexity of acknowledging
   correctly.


So I wonder whether the receivers for HDFS (and local file system) are
reliable, e.g. when I'm using fileStream method to process files in a
directory locally or on HDFS?


-- 
Emre Sevinç


Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Emre Sevinc
I'm adding this 3rd party library to my Maven pom.xml file so that it's
embedded into the JAR I send to spark-submit:

  dependency
  groupIdjson-mapreduce/groupId
  artifactIdjson-mapreduce/artifactId
  version1.0-SNAPSHOT/version
  exclusions
exclusion
  groupIdjavax.servlet/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdcommons-io/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdcommons-lang/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
/exclusion
  /exclusions
/dependency


Then I build my über JAR, and then I run my Spark Streaming application via
the command line:

 spark-submit --class com.example.schemavalidator.SchemaValidatorDriver
--master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar

--
Emre Sevinç


On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das t...@databricks.com wrote:

 That could be a corner case bug. How do you add the 3rd party library to
 the class path of the driver? Through spark-submit? Could you give the
 command you used?

 TD

 On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 I've also tried the following:

 Configuration hadoopConfiguration = new Configuration();
 hadoopConfiguration.set(multilinejsoninputformat.member, itemSet);

 JavaStreamingContext ssc =
 JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
 factory, false);


 but I still get the same exception.

 Why doesn't getOrCreate ignore that Hadoop configuration part (which
 normally works, e.g. when not recovering)?

 --
 Emre


 On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have a Spark Streaming application (that uses Spark 1.2.1) that
 listens to an input directory, and when new JSON files are copied to that
 directory processes them, and writes them to an output directory.

 It uses a 3rd party library to process the multi-line JSON files (
 https://github.com/alexholmes/json-mapreduce). You can see the relevant
 part of the streaming application at:

   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

 When I run this application locally, it works perfectly fine. But then I
 wanted to test whether it could recover from failure, e.g. if I stopped it
 right in the middle of processing some files. I started the streaming
 application, copied 100 files to the input directory, and hit Ctrl+C when
 it has alread processed about 50 files:

 ...
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 [Stage
 0:==
 (65 + 4) / 100]
 ^C

 Then I started the application again, expecting that it could recover
 from the checkpoint. For a while it started to read files again and then
 gave an exception:

 ...
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
  * * * hadoopConfiguration: itemSet
 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
 0.0 (TID 0)
 java.io.IOException: Missing configuration value for
 multilinejsoninputformat.member
 at
 com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run

Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-03 Thread Emre Sevinc
Hello,

I have a Spark Streaming application (that uses Spark 1.2.1) that listens
to an input directory, and when new JSON files are copied to that directory
processes them, and writes them to an output directory.

It uses a 3rd party library to process the multi-line JSON files (
https://github.com/alexholmes/json-mapreduce). You can see the relevant
part of the streaming application at:

  https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

When I run this application locally, it works perfectly fine. But then I
wanted to test whether it could recover from failure, e.g. if I stopped it
right in the middle of processing some files. I started the streaming
application, copied 100 files to the input directory, and hit Ctrl+C when
it has alread processed about 50 files:

...
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
[Stage
0:==
(65 + 4) / 100]
^C

Then I started the application again, expecting that it could recover from
the checkpoint. For a while it started to read files again and then gave an
exception:

...
2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
 * * * hadoopConfiguration: itemSet
2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
(TID 0)
java.io.IOException: Missing configuration value for
multilinejsoninputformat.member
at
com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Since in the exception it refers to a missing configuration
multilinejsoninputformat.member, I think it is about the following line:

   ssc.ssc().sc().hadoopConfiguration().set(multilinejsoninputformat.member
, itemSet);

And this is why I also log the value of it, and as you can see above, just
before it gives the exception in the recovery process, it shows that
multilinejsoninputformat.member
is set to itemSet. But somehow it is not found during the recovery. This
exception happens only when it tries to recover from a previously
interrupted run.

I've also tried moving the above line into the createContext method, but
still had the same exception.

Why is that?

And how can I work around it?

-- 
Emre Sevinç
http://www.bigindustries.be/


Re: Issues reading in Json file with spark sql

2015-03-02 Thread Emre Sevinc
According to Spark SQL Programming Guide:

jsonFile - loads data from a directory of JSON files where each line of the
files is a JSON object.

Note that the file that is offered as jsonFile is not a typical JSON file.
Each line must contain a separate, self-contained valid JSON object. As a
consequence, a regular multi-line JSON file will most often fail.

--
Emre Sevinç
http://www.bigindustries.be
On Mar 2, 2015 8:29 PM, kpeng1 kpe...@gmail.com wrote:

 Hi All,

 I am currently having issues reading in a json file using spark sql's api.
 Here is what the json file looks like:
 {
   namespace: spacey,
   name: namer,
   type: record,
   fields: [
 {name:f1,type:[null,string]},
 {name:f2,type:[null,string]},
 {name:f3,type:[null,string]},
 {name:f4,type:[null,string]},
 {name:f5,type:[null,string]},
 {name:f6,type:[null,string]},
 {name:f7,type:[null,string]},
 {name:f8,type:[null,string]},
 {name:f9,type:[null,string]},
 {name:f10,type:[null,string]},
 {name:f11,type:[null,string]},
 {name:f12,type:[null,string]},
 {name:f13,type:[null,string]},
 {name:f14,type:[null,string]},
 {name:f15,type:[null,string]}
   ]
 }

 This is what I am doing to read in the json file(using spark sql in the
 spark shell on CDH5.3):

 val sqlsc = new org.apache.spark.sql.SQLContext(sc)
 val j = sqlsc.jsonFile(/tmp/try.avsc)


 This is what I am getting as an error:

 15/03/02 11:23:45 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 12,
 10.0.2.15): scala.MatchError: namespace (of class java.lang.String)
 at

 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305)
 at

 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at

 scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
 at
 scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853)
 at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851)
 at
 org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
 at
 org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.1 in stage 3.0 (TID
 14, 10.0.2.15, ANY, 1308 bytes)
 15/03/02 11:23:45 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
 13) in 128 ms on 10.0.2.15 (1/2)
 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.1 in stage 3.0 (TID 14)
 on executor 10.0.2.15: scala.MatchError (namespace (of class
 java.lang.String)) [duplicate 1]
 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.2 in stage 3.0 (TID
 15, 10.0.2.15, ANY, 1308 bytes)
 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.2 in stage 3.0 (TID 15)
 on executor 10.0.2.15: scala.MatchError (namespace (of class
 java.lang.String)) [duplicate 2]
 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.3 in stage 3.0 (TID
 16, 10.0.2.15, ANY, 1308 bytes)
 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.3 in stage 3.0 (TID 16)
 on executor 10.0.2.15: scala.MatchError (namespace (of class
 java.lang.String)) [duplicate 3]
 15/03/02 11:23:45 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times;
 aborting job
 15/03/02 11:23:45 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks
 have all completed, from pool
 15/03/02 11:23:45 INFO TaskSchedulerImpl: Cancelling stage 3
 15/03/02 11:23:45 INFO DAGScheduler: Job 3 failed: reduce at
 JsonRDD.scala:57, took 0.210707 s
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in
 stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0
 (TID 16, 10.0.2.15): scala.MatchError: namespace (of class
 java.lang.String)
 at

 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305)
 at

 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at 

Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
On Thu, Feb 26, 2015 at 4:20 PM, Sean Owen so...@cloudera.com wrote:

 Yea we discussed this on the list a short while ago. The extra
 overhead of count() is pretty minimal. Still you could wrap this up as
 a utility method. There was even a proposal to add some 'materialize'
 method to RDD.


I definitely would like to vote up for that proposal.

--
Emre


Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
Hello Sean,

Thank you for your advice. Based on your suggestion, I've modified the code
into the following (and once again admired the easy (!) verbosity of Java
compared to 'complex and hard to understand' brevity (!) of Scala):

javaDStream.foreachRDD(
new FunctionJavaRDDString, Void() {
  @Override
  public Void call(JavaRDDString stringJavaRDD) throws
Exception {
stringJavaRDD.foreachPartition(
new VoidFunctionIteratorString() {
  @Override
  public void call(IteratorString iteratorString)
{
return;
  }
}
);

return null;
  }
});


I've tested the above in my application, and also observed it with Visual
VM but could not see a dramatic speed difference (and small heap usage
difference) compared to my initial version where I just use .count() in a
foreachRDD block.

Nevertheless I'll make more experiments to see if differences come up in
terms of speed/memory.

Kind regards,

Emre Sevinç
http://www.bigindustries.be/





On Thu, Feb 26, 2015 at 2:34 PM, Sean Owen so...@cloudera.com wrote:

 Those do quite different things. One counts the data; the other copies
 all of the data to the driver.

 The fastest way to materialize an RDD that I know of is
 foreachPartition(i = None)  (or equivalent no-op VoidFunction in
 Java)

 On Thu, Feb 26, 2015 at 1:28 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Hello,
 
  I have a piece of code to force the materialization of RDDs in my Spark
  Streaming program, and I'm trying to understand which method is faster
 and
  has less memory consumption:
 
javaDStream.foreachRDD(new FunctionJavaRDDString, Void() {
@Override
public Void call(JavaRDDString stringJavaRDD) throws Exception {
 
  //stringJavaRDD.collect();
 
 // or count?
 
  //stringJavaRDD.count();
 
  return null;
}
  });
 
 
  I've checked the source code of Spark at
 
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
 ,
  and see that collect() is defined as:
 
def collect(): Array[T] = {
  val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray)
 Array.concat(results: _*)
}
 
  and count() defined as:
 
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
 
  Therefore I think calling the count() method is faster and/or consumes
 less
  memory, but I wanted to be sure.
 
  Anyone cares to comment?
 
 
  --
  Emre Sevinç
  http://www.bigindustries.be/
 




-- 
Emre Sevinc


Re: Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
Francois,

Thank you for quickly verifying.

Kind regards,
Emre Sevinç

On Thu, Feb 26, 2015 at 2:32 PM, francois.garil...@typesafe.com wrote:

 The short answer:
 count(), as the sum can be partially aggregated on the mappers.

 The long answer:

 http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html

 —
 FG


 On Thu, Feb 26, 2015 at 2:28 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

  Hello,

 I have a piece of code to force the materialization of RDDs in my Spark
 Streaming program, and I'm trying to understand which method is faster and
 has less memory consumption:

   javaDStream.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {

 //stringJavaRDD.collect();

// or count?

 //stringJavaRDD.count();

 return null;
   }
 });


 I've checked the source code of Spark at
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
 and see that collect() is defined as:

   def collect(): Array[T] = {
 val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray)
Array.concat(results: _*)
   }

 and count() defined as:

   def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

 Therefore I think calling the count() method is faster and/or consumes
 less memory, but I wanted to be sure.

 Anyone cares to comment?


 --
 Emre Sevinç
 http://www.bigindustries.be/





-- 
Emre Sevinc


Which one is faster / consumes less memory: collect() or count()?

2015-02-26 Thread Emre Sevinc
Hello,

I have a piece of code to force the materialization of RDDs in my Spark
Streaming program, and I'm trying to understand which method is faster and
has less memory consumption:

  javaDStream.foreachRDD(new FunctionJavaRDDString, Void() {
  @Override
  public Void call(JavaRDDString stringJavaRDD) throws Exception {

//stringJavaRDD.collect();

   // or count?

//stringJavaRDD.count();

return null;
  }
});


I've checked the source code of Spark at
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala,
and see that collect() is defined as:

  def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray)
   Array.concat(results: _*)
  }

and count() defined as:

  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

Therefore I think calling the count() method is faster and/or consumes less
memory, but I wanted to be sure.

Anyone cares to comment?


-- 
Emre Sevinç
http://www.bigindustries.be/


Re: Get filename in Spark Streaming

2015-02-24 Thread Emre Sevinc
Hello Subacini,

Until someone more knowledgeable suggests a better, more straightforward,
and simpler approach with a working code snippet, I suggest the following
workaround / hack:

 inputStream.foreachRDD(rdd =
  val myStr = rdd.toDebugString
  // process myStr string value, e.g. using regular expressions
)

For example if you print myStr, you can see in your log / consol output
somehing similar to:

15/02/24 15:14:56 INFO FileInputFormat: Total input paths to process : 1
15/02/24 15:14:56 INFO JobScheduler: Added jobs for time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Starting job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
(20) MappedRDD[27] at textFileStream at kmeans.scala:17 []
 |   UnionRDD[26] at textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL14.json NewHadoopRDD[6] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL11.json NewHadoopRDD[7] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL10.json NewHadoopRDD[8] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL6.json NewHadoopRDD[9] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL8.json NewHadoopRDD[10] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL5.json NewHadoopRDD[11] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL1.json NewHadoopRDD[12] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL9.json NewHadoopRDD[13] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL2.json NewHadoopRDD[14] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL16.json NewHadoopRDD[15] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL20.json NewHadoopRDD[16] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL12.json NewHadoopRDD[17] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL4.json NewHadoopRDD[18] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL19.json NewHadoopRDD[19] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL7.json NewHadoopRDD[20] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL17.json NewHadoopRDD[21] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL18.json NewHadoopRDD[22] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL3.json NewHadoopRDD[23] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL13.json NewHadoopRDD[24] at
textFileStream at kmeans.scala:17 []
 |   file:/home/emre/data/train/newsMessageNL15.json NewHadoopRDD[25] at
textFileStream at kmeans.scala:17 []
15/02/24 15:14:56 INFO JobScheduler: Finished job streaming job
1424787295000 ms.0 from job set of time 1424787295000 ms
15/02/24 15:14:56 INFO JobScheduler: Total delay: 1.420 s for time
1424787295000 ms (execution: 0.051 s)
15/02/24 15:14:56 INFO MappedRDD: Removing RDD 5 from persistence list
15/02/24 15:14:56 INFO BlockManager: Removing RDD 5
15/02/24 15:14:56 INFO FileInputDStream: Cleared 0 old files that were
older than 1424787235000 ms:
15/02/24 15:14:56 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

You can process the string to retrieve each section that starts with file:
and ends with a space. Then for each such string you can get your timestamp
from the file name.

--
Emre Sevinç
http://www.bigindustries.be/





On Fri, Feb 6, 2015 at 9:33 PM, Subacini B subac...@gmail.com wrote:

 Thank you Emre, This helps, i am able to get filename.

 But i am not sure how to fit this into Dstream RDD.

 val inputStream = ssc.textFileStream(/hdfs Path/)

 inputStream is Dstreamrdd and in foreachrdd , am doing my processing

  inputStream.foreachRDD(rdd = {
* //how to get filename here??*
 })


 Can you please help.


 On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 Did you check the following?


 http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

 http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html

 --
 Emre Sevinç


 On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote:

 Hi All,

 We have filename with timestamp say ABC_1421893256000.txt and the
 timestamp  needs to be extracted from file name for further processing.Is
 there a way to get input file name  picked up by spark streaming job?

 Thanks in advance

 Subacini




 --
 Emre Sevinc





-- 
Emre Sevinc


Re: Can you add Big Industries to the Powered by Spark page?

2015-02-24 Thread Emre Sevinc
Hello,

Thanks for adding, but URL seems to have a typo: when I click it tries to
open

http//www.bigindustries.be/

But it should be:

http://www.bigindustries.be/

Kind regards,

Emre Sevinç
http://http//www.bigindustries.be/
 On Feb 25, 2015 12:29 AM, Patrick Wendell pwend...@gmail.com wrote:

 I've added it, thanks!

 On Fri, Feb 20, 2015 at 12:22 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:
 
  Hello,
 
  Could you please add Big Industries to the Powered by Spark page at
  https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ?
 
 
  Company Name: Big Industries
 
  URL:  http://http://www.bigindustries.be/
 
  Spark Components: Spark Streaming
 
  Use Case: Big Content Platform
 
  Summary: The Big Content Platform is a business-to-business content asset
  management service providing a searchable, aggregated source of live news
  feeds, public domain media and archives of content.
 
  The platform is founded on Apache Hadoop, uses the HDFS filesystem,
 Apache
  Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally,
 the
  platform leverages public datasets like Freebase, DBpedia, Wiktionary,
 and
  Geonames to support semantic text enrichment.
 
 
 
  Kind regards,
 
  Emre Sevinç
  http://www.bigindustries.be/
 



Re: Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?

2015-02-23 Thread Emre Sevinc
Hello Todd,

Thank you for your suggestion! I have first tried increasing the Driver
memory to 2G and it worked without any problems, but I will also test with
the parameters and values you've shared.

Kind regards,
Emre Sevinç
http://www.bigindustries.be/


On Fri, Feb 20, 2015 at 3:25 PM, Todd Nist tsind...@gmail.com wrote:

 Hi Emre,

 Have you tried adjusting these:

 .set(spark.akka.frameSize, 500).set(spark.akka.askTimeout, 
 30).set(spark.core.connection.ack.wait.timeout, 600)

 -Todd

 On Fri, Feb 20, 2015 at 8:14 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 We are building a Spark Streaming application that listens to a directory
 on HDFS, and uses the SolrJ library to send newly detected files to a Solr
 server. When we put 10.000 files to the directory it is listening to, it
 starts to process them by sending the files to our Solr server but after
 about a few thousand files the Spark Streaming application dies.

 Before the application dies, It gives some TimeoutException errors
 related to Akka, such as:

   util.AkkaUtils: Error sending message in 1 attempts
   java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]
   akka.pattern.AskTimeoutException: Timed out

 Any ideas on how to deal with this? Should we add/change/tweak some Spark
 configuration parameters?

 We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and
 2GB of memory to that application when invoking it via spark-submit command.

 Below you can read the last few lines of the log file, showing what our
 Spark Streaming application logged just before it died:


 15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split:
 hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620
 15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading
 broadcast variable 3004
 15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called
 with curMem=31171148, maxMem=794647
 15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0
 stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB)
 15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_3004_piece0
 15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast
 variable 3004 took 7897 ms
 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363)
 called with curMem=31192144, maxMem=794647
 15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored
 as values in memory (estimated size 339.2 KB, free 1030.2 MB)
 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called
 with curMem=31539507, maxMem=794647
 15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as
 bytes in memory (estimated size 2.6 KB, free 1030.2 MB)
 15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block
 rdd_3659_3
 15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client,
 config:maxConnections=128maxConnectionsPerHost=32followRedirects=false
 15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called
 with curMem=31542134, maxMem=794647
 15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as
 bytes in memory (estimated size 5.0 B, free 1030.2 MB)
 15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block
 rdd_3660_3
 15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage
 245.0 (TID 3455). 2516 bytes result sent to driver
 15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] -
 [akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated!
 Shutting down.

 LogType: stdout
 LogLength: 0
 Log Contents:



 Container: container_1422006251277_0837_01_04 on
 node08.demo.hadoop_8041

 ==
 LogType: stderr
 LogLength: 2952
 Log Contents:
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0

Re: Streaming Linear Regression

2015-02-20 Thread Emre Sevinc
Hello Baris,

Giving your complete source code (if not very long, or maybe via
https://gist.github.com/) could be more helpful.

Also telling which Spark version you use, on which file system, and how you
run your application, together with the any log / output info it produces
might make collective debugging relatively easier.

--
Emre Sevinç
http://www.bigindustries.be/



On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote:

 Hi

 I tried to run Streaming Linear Regression in my local.

 val trainingData =

 ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse)

 textFileStream is not seeing the new files. I search on the Internet, and I
 saw that somebody has same issue but no solution is found for that.

 Is there any opinion for this ? Is there any body who can achieve  the
 running streaming linear regression ?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Emre Sevinc


Can you add Big Industries to the Powered by Spark page?

2015-02-20 Thread Emre Sevinc
Hello,

Could you please add Big Industries to the Powered by Spark page at
https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ?


Company Name: Big Industries

URL:  http://http://www.bigindustries.be/

Spark Components: Spark Streaming

Use Case: Big Content Platform

Summary: The Big Content Platform is a business-to-business content asset
management service providing a searchable, aggregated source of live news
feeds, public domain media and archives of content.

The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache
Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the
platform leverages public datasets like Freebase, DBpedia, Wiktionary, and
Geonames to support semantic text enrichment.



Kind regards,

Emre Sevinç
http://www.bigindustries.be/


Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?

2015-02-20 Thread Emre Sevinc
Hello,

We are building a Spark Streaming application that listens to a directory
on HDFS, and uses the SolrJ library to send newly detected files to a Solr
server. When we put 10.000 files to the directory it is listening to, it
starts to process them by sending the files to our Solr server but after
about a few thousand files the Spark Streaming application dies.

Before the application dies, It gives some TimeoutException errors related
to Akka, such as:

  util.AkkaUtils: Error sending message in 1 attempts
  java.util.concurrent.TimeoutException: Futures timed out after [30
seconds]
  akka.pattern.AskTimeoutException: Timed out

Any ideas on how to deal with this? Should we add/change/tweak some Spark
configuration parameters?

We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and 2GB
of memory to that application when invoking it via spark-submit command.

Below you can read the last few lines of the log file, showing what our
Spark Streaming application logged just before it died:


15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split:
hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620
15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading
broadcast variable 3004
15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called
with curMem=31171148, maxMem=794647
15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0
stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB)
15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block
broadcast_3004_piece0
15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 3004 took 7897 ms
15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363) called
with curMem=31192144, maxMem=794647
15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored as
values in memory (estimated size 339.2 KB, free 1030.2 MB)
15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called
with curMem=31539507, maxMem=794647
15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as
bytes in memory (estimated size 2.6 KB, free 1030.2 MB)
15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block
rdd_3659_3
15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client,
config:maxConnections=128maxConnectionsPerHost=32followRedirects=false
15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called with
curMem=31542134, maxMem=794647
15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as
bytes in memory (estimated size 5.0 B, free 1030.2 MB)
15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block
rdd_3660_3
15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage 245.0
(TID 3455). 2516 bytes result sent to driver
15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] -
[akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated! Shutting
down.

LogType: stdout
LogLength: 0
Log Contents:



Container: container_1422006251277_0837_01_04 on node08.demo.hadoop_8041
==
LogType: stderr
LogLength: 2952
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/02/20 13:29:26 INFO executor.CoarseGrainedExecutorBackend: Registered
signal handlers for [TERM, HUP, INT]
15/02/20 13:29:27 INFO spark.SecurityManager: Changing view acls to:
yarn,bjorn
15/02/20 13:29:27 INFO spark.SecurityManager: Changing modify acls to:
yarn,bjorn
15/02/20 13:29:27 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(yarn, bjorn); users with modify permissions: 

Re: Streaming Linear Regression

2015-02-20 Thread Emre Sevinc
Baris,

I've tried the following piece of code:

https://gist.github.com/emres/10c509c1d69264fe6fdb

and built it using

sbt package

and then submitted it via

  spark-submit --class
org.apache.spark.examples.mllib.StreamingLinearRegression --master local[4]
target/scala-2.10/streaminglinearregression_2.10-1.0.jar

And once it started to run, I've waited for a few seconds, and then I've
copied a few files to

   /home/emre/data/train

And observed the log output on my console:

 15/02/20 13:08:35 INFO FileInputDStream: Finding new files took 29 ms
15/02/20 13:08:35 INFO FileInputDStream: New files at time 1424434115000 ms:
file:/home/emre/data/train/newsMessageNL14.json
file:/home/emre/data/train/newsMessageNL11.json
file:/home/emre/data/train/newsMessageNL10.json
file:/home/emre/data/train/newsMessageNL6.json
file:/home/emre/data/train/newsMessageNL8.json
file:/home/emre/data/train/newsMessageNL5.json
file:/home/emre/data/train/newsMessageNL1.json
file:/home/emre/data/train/newsMessageNL9.json
file:/home/emre/data/train/newsMessageNL2.json
file:/home/emre/data/train/newsMessageNL16.json
file:/home/emre/data/train/newsMessageNL20.json
file:/home/emre/data/train/newsMessageNL12.json
file:/home/emre/data/train/newsMessageNL4.json
file:/home/emre/data/train/newsMessageNL19.json
file:/home/emre/data/train/newsMessageNL7.json
file:/home/emre/data/train/newsMessageNL17.json
file:/home/emre/data/train/newsMessageNL18.json
file:/home/emre/data/train/newsMessageNL3.json
file:/home/emre/data/train/newsMessageNL13.json
file:/home/emre/data/train/newsMessageNL15.json
15/02/20 13:08:35 INFO MemoryStore: ensureFreeSpace(214074) called with
curMem=0, maxMem=278019440

The contents of JSON files of course don't make sense in this context
(building a linear regression model), I've used them only to check whether
the system detects new files, and as can be seen above, it does.

You can start from the source code I've shared, which is detecting new
files, and continue to build your particular streaming linear regression
application.

--
Emre Sevinç
http://www.bigindustries.be



On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote:

 Hi

 I tried to run Streaming Linear Regression in my local.

 val trainingData =

 ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse)

 textFileStream is not seeing the new files. I search on the Internet, and I
 saw that somebody has same issue but no solution is found for that.

 Is there any opinion for this ? Is there any body who can achieve  the
 running streaming linear regression ?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Emre Sevinc


In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and

2015-02-19 Thread Emre Sevinc
Hello,

We have a Spark Streaming application that watches an input directory, and
as files are copied there the application reads them and sends the contents
to a RESTful web service, receives a response and write some contents to an
output directory.

When testing the application by copying a few thousand files at once to its
input directory, we have realized that after having processed about 3800
files, it creates messages as the following in the log file:

15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
9960 dropped from memory (free 447798720)
15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)

and then the Spark Streaming application dies.

What might be the potential causes to check for such errors?

Below you can see last few lines before it dies:


15/02/19 10:22:03 INFO broadcast.TorrentBroadcast: Started reading
broadcast variable 12894
15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(20978) called
with curMem=107884847, maxMem=556038881
15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894_piece0
stored as bytes in memory (estimated size 20.5 KB, free 427.4 MB)
15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
broadcast_12894_piece0
15/02/19 10:22:04 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 12894 took 460 ms
15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(347363) called
with curMem=107905825, maxMem=556038881
15/02/19 10:22:04 INFO storage.MemoryStore: Block broadcast_12894 stored as
values in memory (estimated size 339.2 KB, free 427.0 MB)
15/02/19 10:22:04 INFO storage.MemoryStore: ensureFreeSpace(1079) called
with curMem=108253188, maxMem=556038881
15/02/19 10:22:04 INFO storage.MemoryStore: Block rdd_30466_35 stored as
bytes in memory (estimated size 1079.0 B, free 427.0 MB)
15/02/19 10:22:04 INFO storage.BlockManagerMaster: Updated info of block
rdd_30466_35
15/02/19 10:22:05 INFO storage.MemoryStore: ensureFreeSpace(5) called with
curMem=108254267, maxMem=556038881
15/02/19 10:22:05 INFO storage.MemoryStore: Block rdd_30467_35 stored as
bytes in memory (estimated size 5.0 B, free 427.0 MB)
15/02/19 10:22:05 INFO storage.BlockManagerMaster: Updated info of block
rdd_30467_35
15/02/19 10:22:05 INFO executor.Executor: Finished task 35.0 in stage 351.0
(TID 12229). 2353 bytes result sent to driver
15/02/19 10:22:06 INFO storage.BlockManager: Removing broadcast 17935
15/02/19 10:22:06 INFO storage.BlockManager: Removing block
broadcast_17935_piece0
15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935_piece0 of
size 4151 dropped from memory (free 447788760)
15/02/19 10:22:06 INFO storage.BlockManagerMaster: Updated info of block
broadcast_17935_piece0
15/02/19 10:22:06 INFO storage.BlockManager: Removing block broadcast_17935
15/02/19 10:22:06 INFO storage.MemoryStore: Block broadcast_17935 of size
9960 dropped from memory (free 447798720)
15/02/19 10:22:55 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
15/02/19 10:23:28 WARN util.AkkaUtils: Error sending message in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
15/02/19 10:24:01 WARN util.AkkaUtils: Error sending message in 3 attempts

Re: In a Spark Streaming application, what might be the potential causes for util.AkkaUtils: Error sending message in 1 attempts and java.util.concurrent.TimeoutException: Futures timed out and

2015-02-19 Thread Emre Sevinc
On Thu, Feb 19, 2015 at 12:27 PM, Tathagata Das t...@databricks.com wrote:

 What version of Spark are you using?

 TD



Spark version is 1.2.0 (running on Cloudera CDH 5.3.0)


--
Emre Sevinç


Re: Spark Streaming output cannot be used as input?

2015-02-18 Thread Emre Sevinc
Hello Jose,

We've hit the same issue a couple of months ago. It is possible to write
directly to files instead of creating directories, but it is not
straightforward, and I haven't seen any clear demonstration in books,
tutorials, etc.

We do something like:

SparkConf sparkConf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(batchInterval));
JavaDStreamString stream = MyModuleApp.initializeJob(ssc);
MyModuleApp.process(stream);

And then in the process method:

@Override public void process(JavaDStreamString inStream) {
JavaDStreamString json = inStream.map(new
MyModuleWorker(jsonSchemaName, validatedJSONoutputDir,
rejectedJSONoutputDir));
forceOutput(json);  }


This, in turn, calls the following (I've removed the irrelevant lines
to focus on writing):


public class MyModuleWorker implements FunctionString,String {

  public String call(String json) {
// process the data and then write it

writeJSON(json, validatedJSONoutputDir_);  }}

And the writeJSON method is:

public static final void writeJSON(String json, String jsonDirPath)
throws IOException {String jsonFileName = jsonDirPath + / +
UUID.randomUUID().toString() + .json.tmp;URI uri =
URI.create(jsonFileName);Configuration conf = new Configuration();
   FileSystem fileSystem = FileSystem.get(uri, conf);
FSDataOutputStream out = fileSystem.create(new Path(uri));
out.write(json.getBytes(StandardCharsets.UTF_8));out.close();
fileSystem.rename(new Path(uri),new
Path(URI.create(jsonDirPath + / + UUID.randomUUID().toString() +
.json)));  }


Using a similar technique you might be able to achieve your objective.

Kind regards,

Emre Sevinç
http://www.bigindustries.be/



On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez jfernan...@sdl.com wrote:

  Hello folks,



 Our intended use case is:

 -  Spark Streaming app #1 reads from RabbitMQ and output to HDFS

 -  Spark Streaming app #2 reads #1’s output and stores the data
 into Elasticsearch



 The idea behind this architecture is that if Elasticsearch is down due to
 an upgrade or system error we don’t have to stop reading messages from the
 queue. We could also scale each process separately as needed.



 After a few hours research my understanding is that Spark Streaming
 outputs files in a *directory* for which you provide the prefix and suffix.
 This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting
 otherwise:



   /**

* Save each RDD in this DStream as a Sequence file of serialized
 objects.

* The file name at each batch interval is generated based on `prefix`
 and

* `suffix`: prefix-TIME_IN_MS.suffix.

*/



 Spark Streaming can monitor an HDFS directory for files but subfolders are
 not supported. So as far as I can tell, it is not possible to use Spark
 Streaming output as input for a different Spark Streaming app without
 somehow performing a separate operation in the middle.



 Am I missing something obvious? I’ve read some suggestions like using
 Hadoop to merge the directories (whose names I don’t see how you would
 know) and to reduce the partitions to 1 (which wouldn’t help).



 Any other suggestions? What is the expected pattern a developer would
 follow that would make Spark Streaming’s output format usable?





 www.sdl.com
 http://www.sdl.com/?utm_source=Emailutm_medium=Email%2BSignatureutm_campaign=SDL%2BStandard%2BEmail%2BSignature

  *SDL PLC confidential, all rights reserved.* If you are not the intended
 recipient of this mail SDL requests and requires that you delete it without
 acting upon or copying any of its contents, and we further request that you
 advise us.

 SDL PLC is a public limited company registered in England and Wales.
 Registered number: 02675207.
 Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
 7DY, UK.

 This message has been scanned for malware by Websense. www.websense.com




-- 
Emre Sevinc


Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-18 Thread Emre Sevinc
Hello Imran,

(a) I know that all 20 files are processed when I use foreachRDD, because I
can see the processed files in the output directory. (My application logic
writes them to an output directory after they are processed, *but* that
writing operation does not happen in foreachRDD, below you can see the URL
that includes my code and clarifies this).

(b) I know only 16 files are processed because in the output directory I
see only 16 files processed. I wait for minutes and minutes and no more
files appear in the output directory. When I see only 16 files are
processed and Spark Streaming went to the mode of idly watching the input
directory, and then if I copy a few more files, they are also processed.

(c) Sure, you can see part of my code in the following gist:
https://gist.github.com/emres/0fb6de128baea099e741
 It might seem a little convoluted at first, because my application is
divided into two classes, a Driver class (setting up things and
initializing them), and a Worker class (that implements the core
functionality). I've also put the relevant methods from the my utility
classes for completeness.

I am as perplexed as you are as to why forcing the output via foreachRDD
ended up in different behaviour compared to simply using print() method.

Kind regards,
Emre



On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com wrote:

 Hi Emre,

 there shouldn't be any difference in which files get processed w/ print()
 vs. foreachRDD().  In fact, if you look at the definition of print(), it is
 just calling foreachRDD() underneath.  So there is something else going on
 here.

 We need a little more information to figure out exactly what is going on.
  (I think Sean was getting at the same thing ...)

 (a) how do you know that when you use foreachRDD, all 20 files get
 processed?

 (b) How do you know that only 16 files get processed when you print()? Do
 you know the other files are being skipped, or maybe they are just stuck
 somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
 processed ... what happens after you add a few more files to the
 directory?  Are they processed immediately, or are they never processed
 either?

 (c) Can you share any more code of what you are doing to the dstreams
 *before* the print() / foreachRDD()?  That might give us more details about
 what the difference is.

 I can't see how .count.println() would be different than just println(),
 but maybe I am missing something also.

 Imran

 On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files
 and does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also have
 fixed my problem, but I'm satisfied with foreachRDD approach for now.
 (Though it is still a mystery to me why using .print() had a difference,
 maybe my mental model of Spark is wrong, I thought no matter what output
 operation I used, the number of files processed by Spark would be
 independent of that because the processing is done in a different method,
 .print() is only used to force Spark execute that processing, am I wrong?).

 --
 Emre


 On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't let
 you detect whether it happened. Print should print some results to the
 console but on different machines, so may not be a reliable way to see what
 happened.

 Yes I understand your real process uses foreachRDD and that's what you
 should use. It sounds like that works. But you must always have been using
 that right? What do you mean that you changed to use it?

 Basically I'm not clear on what the real code does and what about the
 output of that code tells you only 16 files were processed.
 On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead
 of print (see my second message in this thread). Apparently forcing Spark
 to materialize DAG via print() is not the way to go. (My interpretation
 might

Re: Re: Problem with 1 master + 2 slaves cluster

2015-02-18 Thread Emre Sevinc
On Wed, Feb 18, 2015 at 10:23 AM, bit1...@163.com bit1...@163.com wrote:

 Sure, thanks Akhil.
 A further question : Is local file system(file:///) not supported in
 standalone cluster?



FYI: I'm able to write to local file system (via HDFS API and using
file:/// notation) when using Spark.


--
Emre Sevinç
http://www.bigindustries.be/


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-18 Thread Emre Sevinc
Thanks to everyone for suggestions and explanations.

Currently I've started to experiment with the following scenario, that
seems to work for me:

- Put the properties file on a web server so that it is centrally available
- Pass it to the Spark driver program via --conf 'propertiesFile=http:
//myWebServer.com/mymodule.properties'
- And then load the configuration using Apache Commons Configuration:

PropertiesConfiguration config = new PropertiesConfiguration();
config.load(System.getProperty(propertiesFile));

Using the method described above, I don't need to statically compile my
properties file into the über JAR anymore, I can modify the file on the web
server and when I submit my application via spark-submit, passing the URL
of the properties file, the driver program reads the contents of that file
for once, retrieves the values of the keys and continues.

PS: I've opted for Apache Commons Configuration because it is already part
of the many dependencies I have in my pom.xml, and I did not want to pull
another library, even though I Typesafe Config library seems to be a
powerful and flexible choice, too.

--
Emre



On Tue, Feb 17, 2015 at 6:12 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 Emre,

 As you are keeping the properties file external to the JAR you need to
 make sure to submit the properties file as an additional --files (or
 whatever the necessary CLI switch is) so all the executors get a copy of
 the file along with the JAR.

 If you know you are going to just put the properties file on HDFS then why
 don't you define a custom system setting like properties.url and pass it
 along:

 (this is for Spark shell, the only CLI string I have available at the
 moment:)

 spark-shell --jars $JAR_NAME \
 --conf 'properties.url=hdfs://config/stuff.properties' \
 --conf
 'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties'

 ... then load the properties file during initialization by examining the
 properties.url system setting.

 I'd still strongly recommend Typesafe Config as it makes this a lot less
 painful, and I know for certain you can place your *.conf at a URL (using
 the -Dconfig.url=) though it probably won't work with an HDFS URL.



 On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com
 wrote:

 +1 for TypeSafe config
 Our practice is to include all spark properties under a 'spark' entry in
 the config file alongside job-specific configuration:

 A config file would look like:
 spark {
  master = 
  cleaner.ttl = 123456
  ...
 }
 job {
 context {
 src = foo
 action = barAction
 }
 prop1 = val1
 }

 Then, to create our Spark context, we transparently pass the spark
 section to a SparkConf instance.
 This idiom will instantiate the context with the spark specific
 configuration:


 sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark)))

 And we can make use of the config object everywhere else.

 We use the override model of the typesafe config: reasonable defaults go
 in the reference.conf (within the jar). Environment-specific overrides go
 in the application.conf (alongside the job jar) and hacks are passed with
 -Dprop=value :-)


 -kr, Gerard.


 On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 I've decided to try

   spark-submit ... --conf
 spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

 But when I try to retrieve the value of propertiesFile via

System.err.println(propertiesFile :  +
 System.getProperty(propertiesFile));

 I get NULL:

propertiesFile : null

 Interestingly, when I run spark-submit with --verbose, I see that it
 prints:

   spark.driver.extraJavaOptions -
 -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

 I couldn't understand why I couldn't get to the value of
 propertiesFile by using standard System.getProperty method. (I can use
 new SparkConf().get(spark.driver.extraJavaOptions)  and manually parse
 it, and retrieve the value, but I'd like to know why I cannot retrieve that
 value using System.getProperty method).

 Any ideas?

 If I can achieve what I've described above properly, I plan to pass a
 properties file that resides on HDFS, so that it will be available to my
 driver program wherever that program runs.

 --
 Emre




 On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke 
 charles.fed...@gmail.com wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf
 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf

[POWERED BY] Can you add Big Industries to the Powered by Spark page?

2015-02-18 Thread Emre Sevinc
Hello,

Could you please add Big Industries to the Powered by Spark page at
https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ?


Company Name: Big Industries

URL:  http://http://www.bigindustries.be/

Spark Components: Spark Streaming

Use Case: Big Content Platform

Summary: The Big Content Platform is a business-to-business content asset
management service providing a searchable, aggregated source of live news
feeds, public domain media and archives of content.

The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache
Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the
platform leverages public datasets like Freebase, DBpedia, Wiktionary, and
Geonames to support semantic text enrichment.



Kind regards,

Emre Sevinç
http://http://www.bigindustries.be/


Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Emre Sevinc
Hello Dmitry,

I had almost the same problem and solved it by using version 4.0.0 of SolrJ:

dependency
  groupIdorg.apache.solr/groupId
  artifactIdsolr-solrj/artifactId
  version4.0.0/version
 /dependency

In my case, I was lucky that version 4.0.0 of SolrJ had all the
functionality I needed.

--
Emre Sevinç
http://www.bigindustries.be/



On Wed, Feb 18, 2015 at 4:39 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 I think I'm going to have to rebuild Spark with commons.httpclient.version
 set to 4.3.1 which looks to be the version chosen by Solrj, rather than the
 4.2.6 that Spark's pom mentions. Might work.

 On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 Hi

 Did you try to make maven pick the latest version


 http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management

 That way solrj won't cause any issue, you can try this and check if the
 part of your code where you access HDFS works fine?



 On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com
 wrote:

 I'm getting the below error when running spark-submit on my class. This
 class
 has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
 4.10.3 from within the class.

 This is in conflict with the older version, HttpClient 3.1 that's a
 dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).

 I've tried setting spark.files.userClassPathFirst to true in SparkConf
 in my
 program, also setting it to true in
 $SPARK-HOME/conf/spark-defaults.conf as

 spark.files.userClassPathFirst true

 No go, I'm still getting the error, as below. Is there anything else I
 can
 try? Are there any plans in Spark to support multiple class loaders?

 Exception in thread main java.lang.NoSuchMethodError:

 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at

 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at

 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at

 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
 ...





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





-- 
Emre Sevinc


Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Emre Sevinc
On Wed, Feb 18, 2015 at 4:54 PM, Dmitry Goldenberg dgoldenberg...@gmail.com
 wrote:

 Thank you, Emre. It seems solrj still depends on HttpClient 4.1.3; would
 that not collide with Spark/Hadoop's default dependency on HttpClient set
 to 4.2.6? If that's the case that might just solve the problem.

 Would Solrj 4.0.0 work with the latest Solr, 4.10.3?


In my case, it worked; I mean I was trying to send some documents to the
latest version of Solr server (v4.10.3), and using v4.0.0 of SolrJ worked
without any problems so far. I couldn't find any other way to deal with
this old httpclient dependency problem in Spark.

--
Emre Sevinç
http://www.bigindustries.be/


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-17 Thread Emre Sevinc
I've decided to try

  spark-submit ... --conf
spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

But when I try to retrieve the value of propertiesFile via

   System.err.println(propertiesFile :  +
System.getProperty(propertiesFile));

I get NULL:

   propertiesFile : null

Interestingly, when I run spark-submit with --verbose, I see that it prints:

  spark.driver.extraJavaOptions -
-DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

I couldn't understand why I couldn't get to the value of propertiesFile
by using standard System.getProperty method. (I can use new
SparkConf().get(spark.driver.extraJavaOptions)  and manually parse it,
and retrieve the value, but I'd like to know why I cannot retrieve that
value using System.getProperty method).

Any ideas?

If I can achieve what I've described above properly, I plan to pass a
properties file that resides on HDFS, so that it will be available to my
driver program wherever that program runs.

--
Emre




On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

 Since the properties file is packaged up with the JAR I don't have to
 worry about sending the file separately to all of the slave nodes. Typesafe
 Config is written in Java so it will work if you're not using Scala. (The
 Typesafe Config also has the advantage of being extremely easy to integrate
 with code that is using Java Properties today.)

 If you instead want to send the file separately from the JAR and you use
 the Typesafe Config library, you can specify config.file instead of
 .resource; though I'd point you to [3] below if you want to make your
 development life easier.

 1. https://github.com/typesafehub/config
 2. https://github.com/ceedubs/ficus
 3.
 http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



 On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I have
 non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I thought I could read the value of my non-Spark property, namely,
 job.output.dir by using:

 SparkConf sparkConf = new SparkConf();
 final String validatedJSONoutputDir = sparkConf.get(job.output.dir);

 But it gives me an exception:

 Exception in thread main java.util.NoSuchElementException:
 job.output.dir

 Is it not possible to mix Spark and non-Spark properties in a single
 .properties file, then pass it via --properties-file and then get the
 values of those non-Spark properties via SparkConf?

 Or is there another object / method to retrieve the values for those
 non-Spark properties?


 --
 Emre Sevinç




-- 
Emre Sevinc


Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
I've managed to solve this, but I still don't know exactly why my solution
works:

In my code I was trying to force the Spark to output via:

  jsonIn.print();

jsonIn being a JavaDStreamString.

When removed the code above, and added the code below to force the output
operation, hence the execution:

jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
  @Override
  public Void call(JavaRDDString stringJavaRDD) throws Exception {
stringJavaRDD.collect();
return null;
  }
});

It works as I expect, processing all of the 20 files I give to it, instead
of stopping at 16.

--
Emre


On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works perfectly
 fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16 files
 end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory, and
 it can process all of the 16 files. That's why I call it magic number 16.

 When I mean it detects all of the files, I mean that in the logs I see the
 following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: 1G
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
 to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
 eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:31 INFO

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Sean,

In this case, I've been testing the code on my local machine and using
Spark locally, so I all the log output was available on my terminal. And
I've used the .print() method to have an output operation, just to force
Spark execute.

And I was not using foreachRDD, I was only using print() method on a
JavaDStream object, and it was working fine for a few files, up to 16 (and
without print() it did not do anything because there were no output
operations).

To sum it up, in my case:

 - Initially, use .print() and no foreachRDD: processes up to 16 files and
does not do anything for the remaining 4.
 - Remove .print() and use foreachRDD: processes all of the 20 files.

Maybe, as in Akhil Das's suggestion, using .count.print() might also have
fixed my problem, but I'm satisfied with foreachRDD approach for now.
(Though it is still a mystery to me why using .print() had a difference,
maybe my mental model of Spark is wrong, I thought no matter what output
operation I used, the number of files processed by Spark would be
independent of that because the processing is done in a different method,
.print() is only used to force Spark execute that processing, am I wrong?).

--
Emre


On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't let
 you detect whether it happened. Print should print some results to the
 console but on different machines, so may not be a reliable way to see what
 happened.

 Yes I understand your real process uses foreachRDD and that's what you
 should use. It sounds like that works. But you must always have been using
 that right? What do you mean that you changed to use it?

 Basically I'm not clear on what the real code does and what about the
 output of that code tells you only 16 files were processed.
 On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead of
 print (see my second message in this thread). Apparently forcing Spark to
 materialize DAG via print() is not the way to go. (My interpretation might
 be wrong, but this is what I've just seen in my case).

 --
 Emre




 On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't seem
 possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the
 output operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16
 files end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 1G
 2015-02-16 12:30:51 WARN

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Hello Sean,

I did not understand your question very well, but what I do is checking the
output directory (and I have various logger outputs at various stages
showing the contents of an input file being processed, the response from
the web service, etc.).

By the way, I've already solved my problem by using foreachRDD instead of
print (see my second message in this thread). Apparently forcing Spark to
materialize DAG via print() is not the way to go. (My interpretation might
be wrong, but this is what I've just seen in my case).

--
Emre




On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't seem
 possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the output
 operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16 files
 end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 1G
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
 to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
 eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO

Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Emre Sevinc
Hello,

I'm using Spark 1.2.1 and have a module.properties file, and in it I have
non-Spark properties, as well as Spark properties, e.g.:

   job.output.dir=file:///home/emre/data/mymodule/out

I'm trying to pass it to spark-submit via:

   spark-submit --class com.myModule --master local[4] --deploy-mode client
--verbose --properties-file /home/emre/data/mymodule.properties
mymodule.jar

And I thought I could read the value of my non-Spark property, namely,
job.output.dir by using:

SparkConf sparkConf = new SparkConf();
final String validatedJSONoutputDir = sparkConf.get(job.output.dir);

But it gives me an exception:

Exception in thread main java.util.NoSuchElementException:
job.output.dir

Is it not possible to mix Spark and non-Spark properties in a single
.properties file, then pass it via --properties-file and then get the
values of those non-Spark properties via SparkConf?

Or is there another object / method to retrieve the values for those
non-Spark properties?


-- 
Emre Sevinç


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Emre Sevinc
Sean,

I'm trying this as an alternative to what I currently do. Currently I have
my module.properties file for my module in the resources directory, and
that file is put inside the über JAR file when I build my application with
Maven, and then when I submit it using spark-submit, I can read that
module.properties file via the traditional method:


properties.load(MyModule.class.getClassLoader().getResourceAsStream(module.properties));

and everything works fine. The disadvantage is that in order to make any
changes to that .properties file effective, I have to re-build my
application. Therefore I'm trying to find a way to be able to send that
module.properties file via spark-submit and read the values in iy, so that
I will not be forced to build my application every time I want to make a
change in the module.properties file.

I've also checked the --files option of spark-submit, but I see that it
is for sending the listed files to executors (correct me if I'm wrong),
what I'm after is being able to pass dynamic properties (key/value pairs)
to the Driver program of my Spark application. And I still could not find
out how to do that.

--
Emre





On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen so...@cloudera.com wrote:

 Since SparkConf is only for Spark properties, I think it will in
 general only pay attention to and preserve spark.* properties. You
 could experiment with that. In general I wouldn't rely on Spark
 mechanisms for your configuration, and you can use any config
 mechanism you like to retain your own properties.

 On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Hello,
 
  I'm using Spark 1.2.1 and have a module.properties file, and in it I have
  non-Spark properties, as well as Spark properties, e.g.:
 
 job.output.dir=file:///home/emre/data/mymodule/out
 
  I'm trying to pass it to spark-submit via:
 
 spark-submit --class com.myModule --master local[4] --deploy-mode
 client
  --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar
 
  And I thought I could read the value of my non-Spark property, namely,
  job.output.dir by using:
 
  SparkConf sparkConf = new SparkConf();
  final String validatedJSONoutputDir =
 sparkConf.get(job.output.dir);
 
  But it gives me an exception:
 
  Exception in thread main java.util.NoSuchElementException:
  job.output.dir
 
  Is it not possible to mix Spark and non-Spark properties in a single
  .properties file, then pass it via --properties-file and then get the
 values
  of those non-Spark properties via SparkConf?
 
  Or is there another object / method to retrieve the values for those
  non-Spark properties?
 
 
  --
  Emre Sevinç




-- 
Emre Sevinc


Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Hello,

I have an application in Java that uses Spark Streaming 1.2.1 in the
following manner:

 - Listen to the input directory.
 - If a new file is copied to that input directory process it.
 - Process: contact a RESTful web service (running also locally and
responsive), send the contents of the file, receive the response from the
web service, write the results as a new file into the output directory
 - batch interval : 30 seconds
 - checkpoint interval: 150 seconds

When I test the application locally with 1 or 2 files, it works perfectly
fine as expected. I run it like:

spark-submit --class myClass --verbose --master local[4]
--deploy-mode client myApp.jar /in file:///out

But then I've realized something strange when I copied 20 files to the
INPUT directory: Spark Streaming detects all of the files, but it ends up
processing *only 16 files*. And the remaining 4 are not processed at all.

I've tried it with 19, 18, and then 17 files. Same result, only 16 files
end up in the output directory.

Then I've tried it by copying 16 files at once to the input directory, and
it can process all of the 16 files. That's why I call it magic number 16.

When I mean it detects all of the files, I mean that in the logs I see the
following lines when I copy 17 files:

===
2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: 1G
2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves to
a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to bind
to another address
2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
file:/tmp/receivedBlockMetadata
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Reading from the logs:
file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
---
Time: 142408626 ms
---

2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408596:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408596
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408596:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408596
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408599:
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408599

---

Time: 142408629 ms
---

Documentation error in MLlib - Clustering?

2015-02-13 Thread Emre Sevinc
Hello,

I was trying the streaming kmeans clustering example in the official
documentation at:

   http://spark.apache.org/docs/1.2.0/mllib-clustering.html

But I've got a type error when I tried to compile the code:

[error]  found   :
org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint][error]
 required: org.apache.spark.streaming.dstream.DStream[(?,
org.apache.spark.mllib.linalg.Vector)][error]
model.predictOnValues(testData).print()[error]
  ^[error] one error found[error] (compile:compile) Compilation failed


And it seems like the solution is to use

   model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print
()

as shown in
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala

instead of

   model.predictOnValues(testData).print()

as written in the documentation.

I just wanted to draw the attention to this, so that one of the maintainers
can fix the documentation.

-- 
Emre Sevinç


Re: How to log using log4j to local file system inside a Spark application that runs on YARN?

2015-02-12 Thread Emre Sevinc
Marcelo and Burak,

Thank you very much for your explanations. Now I'm able to see my logs.

On Wed, Feb 11, 2015 at 7:52 PM, Marcelo Vanzin van...@cloudera.com wrote:

 For Yarn, you need to upload your log4j.properties separately from
 your app's jar, because of some internal issues that are too boring to
 explain here. :-)

 Basically:

   spark-submit --master yarn --files log4j.properties blah blah blah

 Having to keep it outside your app jar is sub-optimal, and I think
 there's a bug filed to fix this, but so far no one has really spent
 time looking at it.


 On Wed, Feb 11, 2015 at 4:29 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Hello,
 
  I'm building an Apache Spark Streaming application and cannot make it
 log to
  a file on the local filesystem when running it on YARN. How can achieve
  this?
 
  I've set log4.properties file so that it can successfully write to a log
  file in /tmp directory on the local file system (shown below partially):
 
   log4j.appender.file=org.apache.log4j.FileAppender
   log4j.appender.file.File=/tmp/application.log
   log4j.appender.file.append=false
   log4j.appender.file.layout=org.apache.log4j.PatternLayout
   log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss}
 %-5p
  %c{1}:%L - %m%n
 
  When I run my Spark application locally by using the following command:
 
   spark-submit --class myModule.myClass --master local[2] --deploy-mode
  client myApp.jar
 
  It runs fine and I can see that log messages are written to
  /tmp/application.log on my local file system.
 
  But when I run the same application via YARN, e.g.
 
   spark-submit --class myModule.myClass --master yarn-client  --name
  myModule --total-executor-cores 1 --executor-memory 1g myApp.jar
 
  or
 
   spark-submit --class myModule.myClass --master yarn-cluster  --name
  myModule --total-executor-cores 1 --executor-memory 1g myApp.jar
 
  I cannot see any /tmp/application.log on the local file system of the
  machine that runs YARN.
 
  What am I missing?
 
 
  --
  Emre Sevinç



 --
 Marcelo




-- 
Emre Sevinc


Re: Get filename in Spark Streaming

2015-02-05 Thread Emre Sevinc
Hello,

Did you check the following?


http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/

http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html

--
Emre Sevinç


On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote:

 Hi All,

 We have filename with timestamp say ABC_1421893256000.txt and the
 timestamp  needs to be extracted from file name for further processing.Is
 there a way to get input file name  picked up by spark streaming job?

 Thanks in advance

 Subacini




-- 
Emre Sevinc


Re: How to define a file filter for file name patterns in Apache Spark Streaming in Java?

2015-02-03 Thread Emre Sevinc
Hello Akhil,

Thank you for taking your time for a detailed answer. I managed to solve it
in a very similar manner.

Kind regards,
Emre Sevinç


On Mon, Feb 2, 2015 at 8:22 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Emre,

 This is how you do that in scala:

 val lines = ssc.fileStream[LongWritable, Text,
 TextInputFormat](/home/akhld/sigmoid, (t: Path) = true, true)

 ​In java you can do something like:

 jssc.ssc().LongWritable, Text,
 SequenceFileInputFormatfileStream(/home/akhld/sigmoid, new
 AbstractFunction1Path, Object() {
 @Override
 public Boolean apply(Path input) {
 //file filtering logic here

 return true;
 }
 }, true, ClassTag$.MODULE$.apply(LongWritable.class),
 ClassTag$.MODULE$.apply(Text.class),
 ClassTag$.MODULE$.apply(SequenceFileInputFormat.class));


 ​


 Thanks
 Best Regards

 On Mon, Feb 2, 2015 at 6:34 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter
 for file names when creating an InputDStream
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html
 by invoking the fileStream
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
 method. My code is working perfectly fine when I don't use a file filter,
 e.g. by invoking the other fileStream
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
 method (described here
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29:

 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
 ).

 According to the documentation of *fileStream* method, I can pass it

   scala.Function1org.apache.hadoop.fs.Path,Object filter

 But so far, I could not create a fileFilter. My initial attempts have
 been

 1- Tried to implement it as:

 Function1Path, Object fileFilter = new Function1Path, Object() {
 @Override
 public Object apply(Path v1) {
   return true;
 }

 @Override
 public A Function1A, Object compose(Function1A, Path g) {
   return Function1$class.compose(this, g);
 }

 @Override
 public A Function1Path, A andThen(Function1Object, A g) {
   return Function1$class.andThen(this, g);
 }
   };

 But apparently my implementation of andThen is wrong, and I couldn't 
 understand how I should implement it. It complains that the anonymous 
 function:

  is not abstract and does not override abstract method 
 AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in 
 scala.Function1

 2- Tried to implement it as:

 Function1Path, Object fileFilter = new AbstractFunction1Path, Object() {
 @Override
 public Object apply(Path v1) {
   return true;
 }
   };

 This one compiles, but when I run it I get an exception:

 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1
 java.io.NotSerializableException: myModule$1
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
 at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
 at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
 at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
 at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at 
 java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
 at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
 at 
 org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke

How to define a file filter for file name patterns in Apache Spark Streaming in Java?

2015-02-02 Thread Emre Sevinc
Hello,

I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter
for file names when creating an InputDStream
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html
by invoking the fileStream
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
method. My code is working perfectly fine when I don't use a file filter,
e.g. by invoking the other fileStream
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
method (described here
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29:
https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
).

According to the documentation of *fileStream* method, I can pass it

  scala.Function1org.apache.hadoop.fs.Path,Object filter

But so far, I could not create a fileFilter. My initial attempts have been

1- Tried to implement it as:

Function1Path, Object fileFilter = new Function1Path, Object() {
@Override
public Object apply(Path v1) {
  return true;
}

@Override
public A Function1A, Object compose(Function1A, Path g) {
  return Function1$class.compose(this, g);
}

@Override
public A Function1Path, A andThen(Function1Object, A g) {
  return Function1$class.andThen(this, g);
}
  };

But apparently my implementation of andThen is wrong, and I couldn't
understand how I should implement it. It complains that the anonymous
function:

 is not abstract and does not override abstract method
AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in
scala.Function1

2- Tried to implement it as:

Function1Path, Object fileFilter = new AbstractFunction1Path, Object() {
@Override
public Object apply(Path v1) {
  return true;
}
  };

This one compiles, but when I run it I get an exception:

2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1
java.io.NotSerializableException: myModule$1
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at 
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184)
at 

Re: Spark streaming - tracking/deleting processed files

2015-02-02 Thread Emre Sevinc
You can utilize the following method:


https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29

It has a parameter:

  newFilesOnly - Should process only new files and ignore existing files in
the directory

And it works as expected.

--
Emre Sevinç



On Fri, Jan 30, 2015 at 7:07 PM, ganterm gant...@gmail.com wrote:

 We are running a Spark streaming job that retrieves files from a directory
 (using textFileStream).
 One concern we are having is the case where the job is down but files are
 still being added to the directory.
 Once the job starts up again, those files are not being picked up (since
 they are not new or changed while the job is running) but we would like
 them
 to be processed.
 Is there a solution for that? Is there a way to keep track what files have
 been processed and can we force older files to be picked up? Is there a
 way to delete the processed files?

 Thanks!
 Markus



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Emre Sevinc


Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con

2015-01-29 Thread Emre Sevinc
Charles,

Thank you very much for another suggestion. Unfortunately I couldn't make
it work that way either. So I downgraded my SolrJ library from 4.10.3 to
4.0.0 [1].

Maybe using Relocating Classes [2] feature of Maven could handle this
issue, but I did not want to complicate my pom.xml further, at least for
now.

1- http://mvnrepository.com/artifact/org.apache.solr/solr-solrj/4.0.0
2-
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html

--
Emre


On Wed, Jan 28, 2015 at 5:40 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 Yeah it sounds like your original exclusion of commons-httpclient from
 hadoop-* was correct, but its still coming in from somewhere.

 Can you try something like this?:

 dependency
 artifactIdcommons-http/artifactId
 groupIdhttpclient/groupId
 scopeprovided/scope
 /dependency

 ref:
 http://stackoverflow.com/questions/4716310/is-there-a-way-to-exclude-a-maven-dependency-globally

 (I don't know if a provided dependency will work without a specific
 version number so I'm just making a guess here.)


 On Wed Jan 28 2015 at 11:24:02 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 When I examine the dependencies again, I see that SolrJ library is using
 v. 4.3.1 of org.apache.httpcomponents:httpclient

 [INFO] +- org.apache.solr:solr-solrj:jar:4.10.3:compile
 [INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile
 ==

 [INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.3:compile
 [INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile
 [INFO] |  +- org.codehaus.woodstox:wstx-asl:jar:3.2.7:compile
 [INFO] |  \- org.noggit:noggit:jar:0.5:compile

 But hadoop-common 2.4.0 is using v. 3.1.1 of
 commons-httpclient:commons-httpclient  :


 +- org.apache.hadoop:hadoop-common:jar:2.4.0:provided
 [INFO] |  +- commons-cli:commons-cli:jar:1.2:compile
 [INFO] |  +- org.apache.commons:commons-math3:jar:3.1.1:provided
 [INFO] |  +- xmlenc:xmlenc:jar:0.52:compile
 [INFO] |  +- commons-httpclient:commons-httpclient:jar:3.1:provided
 ===

 [INFO] |  +- commons-codec:commons-codec:jar:1.4:compile

 So my reasoning was: I have to exclude v. 3.1.1 of
 commons-httpclient:commons-httpclient and force it to use httpclient v.
 4.3.1 that SolrJ declares as a dependency.

 But apparently somehow it does not work, I mean I have also tried your
 latest suggestion (changed the 'exclusion' to org.apache.httpcomponents and
 httpclient), still getting the same exception:


 java.lang.NoSuchMethodError:
 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at
 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at
 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at
 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at
 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)


 Maybe it is about Hadoop 2.4.0, but I think this is what is included in
 the binary download of Spark. I've also tried it with Spark 1.2.0 binary
 (pre-built for Hadoop 2.4 and later).

 Or maybe I'm totally wrong, and the problem / fix is something completely
 different?

 --
 Emre




 On Wed, Jan 28, 2015 at 4:58 PM, Charles Feduke charles.fed...@gmail.com
  wrote:

 It looks like you're shading in the Apache HTTP commons library and its
 a different version than what is expected. (Maybe 4.6.x based on the
 Javadoc.)

 I see you are attempting to exclude commons-httpclient by using:

 exclusion
   groupIdcommons-httpclient/groupId
   artifactId*/artifactId
 /exclusion

 in your pom. However, what I think you really want is:

 exclusion
   groupIdorg.apache.httpcomponents/groupId
   artifactIdhttpclient/artifactId
 /exclusion

 The last time the groupId was commons-httpclient was Aug 2007 as
 version 3.1 (search.maven.com). I hope none of your dependencies rely
 on that particular version. SchemeRegistryFactory was introduced in version
 4.3.1 of httpcomponents so even if by chance one of them did rely on
 commons-httpclient there wouldn't be a class conflict.



 On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 This is what I get:

  ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch
 emeRegistryFactory.class

 (probably because I'm using a self-contained JAR).

 In other words, I'm still stuck.

 --
 Emre


 On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke

Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con

2015-01-28 Thread Emre Sevinc
This is what I get:

 
./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/SchemeRegistryFactory.class

(probably because I'm using a self-contained JAR).

In other words, I'm still stuck.

--
Emre


On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 I deal with problems like this so often across Java applications with
 large dependency trees. Add the shell function at the following link to
 your shell on the machine where your Spark Streaming is installed:

 https://gist.github.com/cfeduke/fe63b12ab07f87e76b38

 Then run in the directory where your JAR files are:

 find-java-class SchemeRegistryFactory

 (I know you said HttpClient but the error seems to be an overload or
 method of SchemeRegistryFactory is missing from the class that is loaded by
 the class loader. The class loader loads the first class it finds that
 match the package/class name coordinates.)

 You'll then be able to zero in on the JAR that is bringing in an older
 version of that class. Once you've done that you can exclude that JAR's
 older dependency from in in your pom.

 If you find out that the newer version is incompatible you'll have to
 perform some magic with the Maven shade plugin.


 On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when
 using *HttpSolrServer* from within Spark Streaming:

 15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.NoSuchMethodError: 
 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
  at 
 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
  at 
 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
  at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
  at 
 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
  at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
  at 
 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
  at 
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
  at 
 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)

 Normally, when I use my utility class that uses SolrJ to connect to a
 Solr server and run it by itself (running it stand-alone without Spark),
 everything works as expected. But when I invoke that utility class inside a
 Spark Streaming application, I get the exception above as soon as it is
 trying to establish a connection to the Solr server. My preliminary
 Internet search led me to believe that some Spark or Hadoop components
 bring an older version of *httpclient*, so I've tried to exclude them in
 my pom.xml.

 But I still get the same exception.

 Any ideas why? Or how can I fix it?

 When I analyze my pom.xml dependencies, I get:

 $ mvn dependency:tree -Ddetail=true | grep http
 [INFO] |  |  \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided
 [INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile
 [INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.3:compile
 [INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile

 The whole dependency tree is:

 $ mvn dependency:tree -Ddetail=true
 [INFO] Scanning for projects...
 [INFO]
 [INFO] 
 
 [INFO] Building bigcontent 1.0-SNAPSHOT
 [INFO] 
 
 [INFO]
 [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent ---
 [INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT
 [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided
 [INFO] |  +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided
 [INFO] |  |  +- org.apache.curator:curator-recipes:jar:2.4.0:provided
 [INFO] |  |  |  \- org.apache.curator:curator-framework:jar:2.4.0:provided
 [INFO] |  |  | \- org.apache.curator:curator-client:jar:2.4.0:provided
 [INFO] |  |  +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  +- 
 org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:provided
 [INFO] |  |  |  +- 
 org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  |  +- 
 org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  |  \- 
 org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:provided
 [INFO] |  |  |  \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:provided
 [INFO] |  |  | \- 
 org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:provided
 [INFO] |  |  |\- 
 org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:provided
 [INFO

Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/sc

2015-01-28 Thread Emre Sevinc
Hello,

I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception when
using *HttpSolrServer* from within Spark Streaming:

15/01/28 13:42:52 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError:
org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
at 
org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
at 
org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
at 
org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
at 
org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
at 
org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
at 
org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
at 
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
at 
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)

Normally, when I use my utility class that uses SolrJ to connect to a Solr
server and run it by itself (running it stand-alone without Spark),
everything works as expected. But when I invoke that utility class inside a
Spark Streaming application, I get the exception above as soon as it is
trying to establish a connection to the Solr server. My preliminary
Internet search led me to believe that some Spark or Hadoop components
bring an older version of *httpclient*, so I've tried to exclude them in my
pom.xml.

But I still get the same exception.

Any ideas why? Or how can I fix it?

When I analyze my pom.xml dependencies, I get:

$ mvn dependency:tree -Ddetail=true | grep http
[INFO] |  |  \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:provided
[INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile
[INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.3:compile
[INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile

The whole dependency tree is:

$ mvn dependency:tree -Ddetail=true
[INFO] Scanning for projects...
[INFO]
[INFO] 
[INFO] Building bigcontent 1.0-SNAPSHOT
[INFO] 
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent ---
[INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided
[INFO] |  +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided
[INFO] |  |  +- org.apache.curator:curator-recipes:jar:2.4.0:provided
[INFO] |  |  |  \- org.apache.curator:curator-framework:jar:2.4.0:provided
[INFO] |  |  | \- org.apache.curator:curator-client:jar:2.4.0:provided
[INFO] |  |  +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:provided
[INFO] |  |  |  +-
org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:provided
[INFO] |  |  |  +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:provided
[INFO] |  |  |  |  +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:provided
[INFO] |  |  |  |  \-
org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:provided
[INFO] |  |  |  \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:provided
[INFO] |  |  | \-
org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:provided
[INFO] |  |  |\-
org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:provided
[INFO] |  |  +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:provided
[INFO] |  |  +- org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:provided
[INFO] |  |  +- org.apache.commons:commons-lang3:jar:3.3.2:provided
[INFO] |  |  +- org.slf4j:jul-to-slf4j:jar:1.7.5:provided
[INFO] |  |  +- org.slf4j:jcl-over-slf4j:jar:1.7.5:provided
[INFO] |  |  +- com.ning:compress-lzf:jar:1.0.0:provided
[INFO] |  |  +- net.jpountz.lz4:lz4:jar:1.2.0:provided
[INFO] |  |  +- com.twitter:chill_2.10:jar:0.3.6:provided
[INFO] |  |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:provided
[INFO] |  |  | +-
com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:provided
[INFO] |  |  | +- com.esotericsoftware.minlog:minlog:jar:1.2:provided
[INFO] |  |  | \- org.objenesis:objenesis:jar:1.2:provided
[INFO] |  |  +- com.twitter:chill-java:jar:0.3.6:provided
[INFO] |  |  +-
org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:provided
[INFO] |  |  |  +-
org.spark-project.akka:akka-actor_2.10:jar:2.2.3-shaded-protobuf:provided
[INFO] |  |  |  |  \- com.typesafe:config:jar:1.0.2:provided
[INFO] |  |  |  +-
org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:provided
[INFO] |  |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:provided
[INFO] |  |  +-
org.spark-project.akka:akka-slf4j_2.10:jar:2.2.3-shaded-protobuf:provided
[INFO] |  |  +- 

Re: Exception when using HttpSolrServer (httpclient) from within Spark Streaming: java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/con

2015-01-28 Thread Emre Sevinc
When I examine the dependencies again, I see that SolrJ library is using v.
4.3.1 of org.apache.httpcomponents:httpclient

[INFO] +- org.apache.solr:solr-solrj:jar:4.10.3:compile
[INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.3.1:compile
==
[INFO] |  +- org.apache.httpcomponents:httpcore:jar:4.3:compile
[INFO] |  +- org.apache.httpcomponents:httpmime:jar:4.3.1:compile
[INFO] |  +- org.codehaus.woodstox:wstx-asl:jar:3.2.7:compile
[INFO] |  \- org.noggit:noggit:jar:0.5:compile

But hadoop-common 2.4.0 is using v. 3.1.1 of
commons-httpclient:commons-httpclient  :

+- org.apache.hadoop:hadoop-common:jar:2.4.0:provided
[INFO] |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.1.1:provided
[INFO] |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  +- commons-httpclient:commons-httpclient:jar:3.1:provided
===
[INFO] |  +- commons-codec:commons-codec:jar:1.4:compile

So my reasoning was: I have to exclude v. 3.1.1 of
commons-httpclient:commons-httpclient and force it to use httpclient v.
4.3.1 that SolrJ declares as a dependency.

But apparently somehow it does not work, I mean I have also tried your
latest suggestion (changed the 'exclusion' to org.apache.httpcomponents and
httpclient), still getting the same exception:

java.lang.NoSuchMethodError:
org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
at
org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
at
org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
at
org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
at
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
at
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)


Maybe it is about Hadoop 2.4.0, but I think this is what is included in the
binary download of Spark. I've also tried it with Spark 1.2.0 binary
(pre-built for Hadoop 2.4 and later).

Or maybe I'm totally wrong, and the problem / fix is something completely
different?

--
Emre




On Wed, Jan 28, 2015 at 4:58 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 It looks like you're shading in the Apache HTTP commons library and its a
 different version than what is expected. (Maybe 4.6.x based on the Javadoc.)

 I see you are attempting to exclude commons-httpclient by using:

 exclusion
   groupIdcommons-httpclient/groupId
   artifactId*/artifactId
 /exclusion

 in your pom. However, what I think you really want is:

 exclusion
   groupIdorg.apache.httpcomponents/groupId
   artifactIdhttpclient/artifactId
 /exclusion

 The last time the groupId was commons-httpclient was Aug 2007 as version
 3.1 (search.maven.com). I hope none of your dependencies rely on that
 particular version. SchemeRegistryFactory was introduced in version 4.3.1
 of httpcomponents so even if by chance one of them did rely on
 commons-httpclient there wouldn't be a class conflict.



 On Wed Jan 28 2015 at 9:19:20 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 This is what I get:

  ./bigcontent-1.0-SNAPSHOT.jar:org/apache/http/impl/conn/Sch
 emeRegistryFactory.class

 (probably because I'm using a self-contained JAR).

 In other words, I'm still stuck.

 --
 Emre


 On Wed, Jan 28, 2015 at 2:47 PM, Charles Feduke charles.fed...@gmail.com
  wrote:

 I deal with problems like this so often across Java applications with
 large dependency trees. Add the shell function at the following link to
 your shell on the machine where your Spark Streaming is installed:

 https://gist.github.com/cfeduke/fe63b12ab07f87e76b38

 Then run in the directory where your JAR files are:

 find-java-class SchemeRegistryFactory

 (I know you said HttpClient but the error seems to be an overload or
 method of SchemeRegistryFactory is missing from the class that is loaded by
 the class loader. The class loader loads the first class it finds that
 match the package/class name coordinates.)

 You'll then be able to zero in on the JAR that is bringing in an older
 version of that class. Once you've done that you can exclude that JAR's
 older dependency from in in your pom.

 If you find out that the newer version is incompatible you'll have to
 perform some magic with the Maven shade plugin.


 On Wed Jan 28 2015 at 8:00:22 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using *Spark 1.1.0* and *Solr 4.10.3*. I'm getting an exception
 when using *HttpSolrServer* from within Spark Streaming:

 15/01/28 13:42:52 ERROR Executor

Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?

2014-12-24 Thread Emre Sevinc
Hello,

I have a piece of code that runs inside Spark Streaming and tries to get
some data from a RESTful web service (that runs locally on my machine). The
code snippet in question is:

 Client client = ClientBuilder.newClient();
 WebTarget target = client.target(http://localhost:/rest;);
 target = target.path(annotate)
 .queryParam(text,
UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission))
 .queryParam(confidence, 0.3);

  logger.warn(!!! DEBUG !!! target: {}, target.getUri().toString());

  String response =
target.request().accept(MediaType.APPLICATION_JSON_TYPE).get(String.class);

  logger.warn(!!! DEBUG !!! Spotlight response: {}, response);

When run inside a unit test as follows:

 mvn clean test -Dtest=SpotlightTest#testCountWords

it contacts the RESTful web service and retrieves some data as expected.
But when the same code is run as part of the application that is submitted
to Spark, using spark-submit script I receive the following error:

  java.lang.NoSuchMethodError:
javax.ws.rs.core.MultivaluedMap.addAll(Ljava/lang/Object;[Ljava/lang/Object;)V

I'm using Spark 1.1.0 and for consuming the web service I'm using Jersey in
my project's pom.xml:

 dependency
  groupIdorg.glassfish.jersey.containers/groupId
  artifactIdjersey-container-servlet-core/artifactId
  version2.14/version
/dependency

So I suspect that when the application is submitted to Spark, somehow
there's a different JAR in the environment that uses a different version of
Jersey / javax.ws.rs.*

Does anybody know which version of Jersey / javax.ws.rs.*  is used in the
Spark environment, or how to solve this conflict?


-- 
Emre Sevinç
https://be.linkedin.com/in/emresevinc/


Re: Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?

2014-12-24 Thread Emre Sevinc
On Wed, Dec 24, 2014 at 1:46 PM, Sean Owen so...@cloudera.com wrote:

 I'd take a look with 'mvn dependency:tree' on your own code first.
 Maybe you are including JavaEE 6 for example?


For reference, my complete pom.xml looks like:

project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=
http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;
  modelVersion4.0.0/modelVersion

  groupIdbigcontent/groupId
  artifactIdbigcontent/artifactId
  version1.0-SNAPSHOT/version
  packagingjar/packaging

  namebigcontent/name
  urlhttp://maven.apache.org/url

  properties
project.build.sourceEncodingUTF-8/project.build.sourceEncoding
  /properties

  build
plugins
  plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-shade-plugin/artifactId
version2.3/version
configuration
  !-- put your configurations here --
/configuration
executions
  execution
phasepackage/phase
goals
  goalshade/goal
/goals
  /execution
/executions
  /plugin

  plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-compiler-plugin/artifactId
version3.2/version
configuration
  source1.7/source
  target1.7/target
/configuration
  /plugin
/plugins
  /build

  dependencies
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-streaming_2.10/artifactId
  version1.1.1/version
  scopeprovided/scope
/dependency

dependency
  groupIdorg.glassfish.jersey.containers/groupId
  artifactIdjersey-container-servlet-core/artifactId
  version2.14/version
/dependency

dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-client/artifactId
  version2.4.0/version
/dependency

dependency
  groupIdcom.google.guava/groupId
  artifactIdguava/artifactId
  version16.0/version
/dependency

dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-mapreduce-client-core/artifactId
  version2.4.0/version
/dependency

dependency
  groupIdjson-mapreduce/groupId
  artifactIdjson-mapreduce/artifactId
  version1.0-SNAPSHOT/version
  exclusions
  exclusion
groupIdjavax.servlet/groupId
artifactId*/artifactId
  /exclusion
exclusion
  groupIdcommons-io/groupId
  artifactId*/artifactId
  /exclusion
  exclusion
  groupIdcommons-lang/groupId
  artifactId*/artifactId
  /exclusion
exclusion
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
/exclusion
  /exclusions
/dependency

dependency
  groupIdorg.apache.avro/groupId
  artifactIdavro-mapred/artifactId
  version1.7.7/version
  exclusions
exclusion
  groupIdjavax.servlet/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
/exclusion
  /exclusions
/dependency

dependency
  groupIdjunit/groupId
  artifactIdjunit/artifactId
  version4.11/version
  scopetest/scope
/dependency

dependency
  groupIdorg.apache.avro/groupId
  artifactIdavro/artifactId
  version1.7.7/version
  exclusions
exclusion
  groupIdjavax.servlet/groupId
  artifactId*/artifactId
/exclusion
  /exclusions
/dependency

dependency
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
  version2.4.0/version
  scopeprovided/scope
  exclusions
exclusion
  groupIdjavax.servlet/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdcom.google.guava/groupId
  artifactId*/artifactId
/exclusion
  /exclusions
/dependency

dependency
  groupIdorg.slf4j/groupId
  artifactIdslf4j-log4j12/artifactId
  version1.7.7/version
/dependency
  /dependencies
/project

And 'mvn dependency:tree' produces the following output:



[INFO] Scanning for projects...
[INFO]

[INFO]

[INFO] Building bigcontent 1.0-SNAPSHOT
[INFO]

[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ bigcontent ---
[INFO] bigcontent:bigcontent:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.1.1:provided
[INFO] |  +- org.apache.spark:spark-core_2.10:jar:1.1.1:provided
[INFO] |  |  +- org.apache.curator:curator-recipes:jar:2.4.0:provided
[INFO] |  |  |  \- org.apache.curator:curator-framework:jar:2.4.0:provided
[INFO] |  |  | \- 

Re: Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?

2014-12-24 Thread Emre Sevinc
It seems like YARN depends an older version of Jersey, that is 1.9:

  https://github.com/apache/spark/blob/master/yarn/pom.xml

When I've modified my dependencies to have only:

  dependency
  groupIdcom.sun.jersey/groupId
  artifactIdjersey-core/artifactId
  version1.9.1/version
/dependency

And then modified the code to use the old Jersey API:

Client c = Client.create();
WebResource r = c.resource(http://localhost:/rest;)
 .path(annotate)
 .queryParam(text,
UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission))
 .queryParam(confidence, 0.3);

logger.warn(!!! DEBUG !!! target: {}, r.getURI());

String response = r.accept(MediaType.APPLICATION_JSON_TYPE)
   //.header()
   .get(String.class);

logger.warn(!!! DEBUG !!! Spotlight response: {}, response);

It seems to work when I use spark-submit to submit the application that
includes this code.

Funny thing is, now my relevant unit test does not run, complaining about
not having enough memory:

Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0xc490, 25165824, 0) failed; error='Cannot
allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 25165824 bytes for
committing reserved memory.

--
Emre


On Wed, Dec 24, 2014 at 1:46 PM, Sean Owen so...@cloudera.com wrote:

 Your guess is right, that there are two incompatible versions of
 Jersey (or really, JAX-RS) in your runtime. Spark doesn't use Jersey,
 but its transitive dependencies may, or your transitive dependencies
 may.

 I don't see Jersey in Spark's dependency tree except from HBase tests,
 which in turn only appear in examples, so that's unlikely to be it.
 I'd take a look with 'mvn dependency:tree' on your own code first.
 Maybe you are including JavaEE 6 for example?

 On Wed, Dec 24, 2014 at 12:02 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Hello,
 
  I have a piece of code that runs inside Spark Streaming and tries to get
  some data from a RESTful web service (that runs locally on my machine).
 The
  code snippet in question is:
 
   Client client = ClientBuilder.newClient();
   WebTarget target = client.target(http://localhost:/rest;);
   target = target.path(annotate)
   .queryParam(text,
  UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission))
   .queryParam(confidence, 0.3);
 
logger.warn(!!! DEBUG !!! target: {},
 target.getUri().toString());
 
String response =
 
 target.request().accept(MediaType.APPLICATION_JSON_TYPE).get(String.class);
 
logger.warn(!!! DEBUG !!! Spotlight response: {}, response);
 
  When run inside a unit test as follows:
 
   mvn clean test -Dtest=SpotlightTest#testCountWords
 
  it contacts the RESTful web service and retrieves some data as expected.
 But
  when the same code is run as part of the application that is submitted to
  Spark, using spark-submit script I receive the following error:
 
java.lang.NoSuchMethodError:
 
 javax.ws.rs.core.MultivaluedMap.addAll(Ljava/lang/Object;[Ljava/lang/Object;)V
 
  I'm using Spark 1.1.0 and for consuming the web service I'm using Jersey
 in
  my project's pom.xml:
 
   dependency
groupIdorg.glassfish.jersey.containers/groupId
artifactIdjersey-container-servlet-core/artifactId
version2.14/version
  /dependency
 
  So I suspect that when the application is submitted to Spark, somehow
  there's a different JAR in the environment that uses a different version
 of
  Jersey / javax.ws.rs.*
 
  Does anybody know which version of Jersey / javax.ws.rs.*  is used in the
  Spark environment, or how to solve this conflict?
 
 
  --
  Emre Sevinç
  https://be.linkedin.com/in/emresevinc/
 




-- 
Emre Sevinc


Re: Why does consuming a RESTful web service (using javax.ws.rs.* and Jsersey) work in unit test but not when submitted to Spark?

2014-12-24 Thread Emre Sevinc
Sean,

Thanks a lot for the important information, especially  userClassPathFirst.

Cheers,
Emre

On Wed, Dec 24, 2014 at 3:38 PM, Sean Owen so...@cloudera.com wrote:

 That could well be it -- oops, I forgot to run with the YARN profile
 and so didn't see the YARN dependencies. Try the userClassPathFirst
 option to try to make your app's copy take precedence.

 The second error is really a JVM bug, but, is from having too little
 memory available for the unit tests.


 http://spark.apache.org/docs/latest/building-spark.html#setting-up-mavens-memory-usage

 On Wed, Dec 24, 2014 at 1:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  It seems like YARN depends an older version of Jersey, that is 1.9:
 
https://github.com/apache/spark/blob/master/yarn/pom.xml
 
  When I've modified my dependencies to have only:
 
dependency
groupIdcom.sun.jersey/groupId
artifactIdjersey-core/artifactId
version1.9.1/version
  /dependency
 
  And then modified the code to use the old Jersey API:
 
  Client c = Client.create();
  WebResource r = c.resource(http://localhost:/rest;)
   .path(annotate)
   .queryParam(text,
  UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission))
   .queryParam(confidence, 0.3);
 
  logger.warn(!!! DEBUG !!! target: {}, r.getURI());
 
  String response = r.accept(MediaType.APPLICATION_JSON_TYPE)
 //.header()
 .get(String.class);
 
  logger.warn(!!! DEBUG !!! Spotlight response: {}, response);
 
  It seems to work when I use spark-submit to submit the application that
  includes this code.
 
  Funny thing is, now my relevant unit test does not run, complaining about
  not having enough memory:
 
  Java HotSpot(TM) 64-Bit Server VM warning: INFO:
  os::commit_memory(0xc490, 25165824, 0) failed; error='Cannot
  allocate memory' (errno=12)
  #
  # There is insufficient memory for the Java Runtime Environment to
 continue.
  # Native memory allocation (mmap) failed to map 25165824 bytes for
  committing reserved memory.
 
  --
  Emre
 
 
  On Wed, Dec 24, 2014 at 1:46 PM, Sean Owen so...@cloudera.com wrote:
 
  Your guess is right, that there are two incompatible versions of
  Jersey (or really, JAX-RS) in your runtime. Spark doesn't use Jersey,
  but its transitive dependencies may, or your transitive dependencies
  may.
 
  I don't see Jersey in Spark's dependency tree except from HBase tests,
  which in turn only appear in examples, so that's unlikely to be it.
  I'd take a look with 'mvn dependency:tree' on your own code first.
  Maybe you are including JavaEE 6 for example?
 
  On Wed, Dec 24, 2014 at 12:02 PM, Emre Sevinc emre.sev...@gmail.com
  wrote:
   Hello,
  
   I have a piece of code that runs inside Spark Streaming and tries to
 get
   some data from a RESTful web service (that runs locally on my
 machine).
   The
   code snippet in question is:
  
Client client = ClientBuilder.newClient();
WebTarget target = client.target(http://localhost:/rest;);
target = target.path(annotate)
.queryParam(text,
   UrlEscapers.urlFragmentEscaper().escape(spotlightSubmission))
.queryParam(confidence, 0.3);
  
 logger.warn(!!! DEBUG !!! target: {},
   target.getUri().toString());
  
 String response =
  
  
 target.request().accept(MediaType.APPLICATION_JSON_TYPE).get(String.class);
  
 logger.warn(!!! DEBUG !!! Spotlight response: {}, response);
  
   When run inside a unit test as follows:
  
mvn clean test -Dtest=SpotlightTest#testCountWords
  
   it contacts the RESTful web service and retrieves some data as
 expected.
   But
   when the same code is run as part of the application that is submitted
   to
   Spark, using spark-submit script I receive the following error:
  
 java.lang.NoSuchMethodError:
  
  
 javax.ws.rs.core.MultivaluedMap.addAll(Ljava/lang/Object;[Ljava/lang/Object;)V
  
   I'm using Spark 1.1.0 and for consuming the web service I'm using
 Jersey
   in
   my project's pom.xml:
  
dependency
 groupIdorg.glassfish.jersey.containers/groupId
 artifactIdjersey-container-servlet-core/artifactId
 version2.14/version
   /dependency
  
   So I suspect that when the application is submitted to Spark, somehow
   there's a different JAR in the environment that uses a different
 version
   of
   Jersey / javax.ws.rs.*
  
   Does anybody know which version of Jersey / javax.ws.rs.*  is used in
   the
   Spark environment, or how to solve this conflict?
  
  
   --
   Emre Sevinç
   https://be.linkedin.com/in/emresevinc/
  
 
 
 
 
  --
  Emre Sevinc




-- 
Emre Sevinc


Re: Unit testing and Spark Streaming

2014-12-12 Thread Emre Sevinc
On Fri, Dec 12, 2014 at 2:17 PM, Eric Loots eric.lo...@gmail.com wrote:
 How can the log level in test mode be reduced (or extended when needed) ?

Hello Eric,

The following might be helpful for reducing the log messages during unit
testing:

 http://stackoverflow.com/a/2736/236007

--
Emre Sevinç
https://be.linkedin.com/in/emresevinc


How can I make Spark Streaming count the words in a file in a unit test?

2014-12-08 Thread Emre Sevinc
Hello,

I've successfully built a very simple Spark Streaming application in Java
that is based on the HdfsCount example in Scala at
https://github.com/apache/spark/blob/branch-1.1/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
.

When I submit this application to my local Spark, it waits for a file to be
written to a given directory, and when I create that file it successfully
prints the number of words. I terminate the application by pressing Ctrl+C.

Now I've tried to create a very basic unit test for this functionality, but
in the test I was not able to print the same information, that is the
number of words.

What am I missing?

Below is the unit test file, and after that I've also included the code
snippet that shows the countWords method:

=
StarterAppTest.java
=
import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
ssc = new JavaStreamingContext(local, test, new Duration(3000));
tempDir = Files.createTempDir();
tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
ssc.stop();
ssc = null;
  }

  @Test
  public void testInitialization() {
Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

StarterApp starterApp = new StarterApp();

try {
  JavaDStreamString lines =
ssc.textFileStream(tempDir.getAbsolutePath());
  JavaPairDStreamString, Integer wordCounts =
starterApp.countWords(lines);

  System.err.println(= Word Counts ===);
  wordCounts.print();
  System.err.println(= Word Counts ===);

  ssc.start();

  File tmpFile = new File(tempDir.getAbsolutePath(), tmp.txt);
  PrintWriter writer = new PrintWriter(tmpFile, UTF-8);
  writer.println(8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin);
  writer.close();

  System.err.println(= Word Counts ===);
  wordCounts.print();
  System.err.println(= Word Counts ===);

} catch (FileNotFoundException e) {
  e.printStackTrace();
} catch (UnsupportedEncodingException e) {
  e.printStackTrace();
}


Assert.assertTrue(true);

  }

}
=

This test compiles and starts to run, Spark Streaming prints a lot of
diagnostic messages on the console but the calls to wordCounts.print();
does not print anything, whereas in StarterApp.java itself, they do.

I've also added ssc.awaitTermination(); after ssc.start() but nothing
changed in that respect. After that I've also tried to create a new file in
the directory that this Spark Streaming application was checking but this
time it gave an error.

For completeness, below is the wordCounts method:


public JavaPairDStreamString, Integer countWords(JavaDStreamString
lines) {
JavaDStreamString words = lines.flatMap(new FlatMapFunctionString,
String() {
  @Override
  public IterableString call(String x) { return
Lists.newArrayList(SPACE.split(x)); }
});

JavaPairDStreamString, Integer wordCounts = words.mapToPair(
new PairFunctionString, String, Integer() {
  @Override
  public Tuple2String, Integer call(String s) { return new
Tuple2(s, 1); }
}).reduceByKey((i1, i2) - i1 + i2);

return wordCounts;
  }




Kind regards
Emre Sevinç


How can I compile only the core and streaming (so that I can get test utilities of streaming)?

2014-12-05 Thread Emre Sevinc
Hello,

I'm currently developing a Spark Streaming application and trying to write
my first unit test. I've used Java for this application, and I also need
use Java (and JUnit) for writing unit tests.

I could not find any documentation that focuses on Spark Streaming unit
testing, all I could find was the Java based unit tests in Spark Streaming
source code:


https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2fblob%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fjava%2forg%2fapache%2fspark%2fstreaming%2fJavaAPISuite.java

that depends on a Scala file:


https://github.com/apache/spark/blob/branch-1.1/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2fblob%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fjava%2forg%2fapache%2fspark%2fstreaming%2fJavaTestUtils.scala

which, in turn, depends on the Scala test files in


https://github.com/apache/spark/tree/branch-1.1/streaming/src/test/scala/org/apache/spark/streaming
https://webmail.cronos.be/owa/redir.aspx?C=Jqwj6T-dSk63ezaf0lH8P7NV7ZXa4NEIco3EZ9VQhfNEvS3bxfKu6ZKfDqYOFlFaCAvyOEVymdw.URL=https%3a%2f%2fgithub.com%2fapache%2fspark%2ftree%2fbranch-1.1%2fstreaming%2fsrc%2ftest%2fscala%2forg%2fapache%2fspark%2fstreaming

So I thought that I could grab the Spark source code, switch to branch-1.1
branch and then only compile 'core' and 'streaming' modules, hopefully
ending up with the compiled classes (or jar files) of the Streaming test
utilities, so that I can import them in my Java based Spark Streaming
application.

However, trying to build it via the following command line failed:

mvn -pl core,streaming package

You can see the full output at the end of this message.

Any ideas how to progress?

Full output of the build:

emre@emre-ubuntu:~/code/spark$ mvn -pl core,streaming package
[INFO] Scanning for projects...
[INFO]

[INFO] Reactor Build Order:
[INFO]
[INFO] Spark Project Core
[INFO] Spark Project Streaming
[INFO]

[INFO]

[INFO] Building Spark Project Core 1.1.2-SNAPSHOT
[INFO]

Downloading:
https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom
Downloaded:
https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.pom
(5 KB at 5.8 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar
Downloaded:
https://repo1.maven.org/maven2/org/apache/maven/plugins/maven-antrun-plugin/1.7/maven-antrun-plugin-1.7.jar
(31 KB at 200.4 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom
Downloaded:
https://repo1.maven.org/maven2/org/apache/commons/commons-math3/3.3/commons-math3-3.3.pom
(24 KB at 178.9 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom
Downloaded:
https://repo1.maven.org/maven2/org/spark-project/akka/akka-testkit_2.10/2.2.3-shaded-protobuf/akka-testkit_2.10-2.2.3-shaded-protobuf.pom
(3 KB at 22.5 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom
Downloaded:
https://repo1.maven.org/maven2/org/scala-lang/scalap/2.10.4/scalap-2.10.4.pom
(2 KB at 19.2 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom
Downloaded:
https://repo1.maven.org/maven2/org/apache/derby/derby/10.4.2.0/derby-10.4.2.0.pom
(2 KB at 14.9 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom
Downloaded:
https://repo1.maven.org/maven2/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.pom
(1010 B at 4.1 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom
Downloaded:
https://repo1.maven.org/maven2/org/easymock/easymockclassextension/3.1/easymockclassextension-3.1.pom
(5 KB at 42.9 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom
Downloaded:
https://repo1.maven.org/maven2/org/easymock/easymock-parent/3.1/easymock-parent-3.1.pom
(13 KB at 133.8 KB/sec)
Downloading:
https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom
Downloaded:
https://repo1.maven.org/maven2/org/easymock/easymock/3.1/easymock-3.1.pom
(6 KB at 38.8 KB/sec)
Downloading:

Re: How can I compile only the core and streaming (so that I can get test utilities of streaming)?

2014-12-05 Thread Emre Sevinc
Hello,

Specifying '-DskipTests' on commandline worked, though I can't be sure
whether first running 'sbt assembly' also contributed to the solution.
(I've tried 'sbt assembly' because branch-1.1's README says to use sbt).

Thanks for the answer.

Kind regards,
Emre Sevinç