Re: Specify log4j properties file

2016-03-09 Thread Matt Narrell
You can also use --files, which doesn't require the file scheme.

On Wed, Mar 9, 2016 at 11:20 AM Ashic Mahtab  wrote:

> Found it.
>
> You can pass in the jvm parameter log4j.configuration. The following works:
>
> -Dlog4j.configuration=file:path/to/log4j.properties
>
> It doesn't work without the file: prefix though. Tested in 1.6.0.
>
> Cheers,
> Ashic.
>
> --
> From: as...@live.com
> To: user@spark.apache.org
> Subject: Specify log4j properties file
> Date: Wed, 9 Mar 2016 17:57:00 +
>
>
> Hello,
> Is it possible to provide a log4j properties file when submitting jobs to
> a cluster? I know that by default spark looks for a log4j.properties file
> in the conf directory. I'm looking for a way to specify a different
> log4j.properties file (external to the application) without pointing to a
> completely different conf directory. Is there a way to achieve this?
>
> Thanks,
> Ashic.
>


[Spark ML] HasInputCol, etc.

2015-07-28 Thread Matt Narrell
Hey,

Our ML ETL pipeline has several complex steps that I’d like to address with 
custom Transformers in an ML Pipeline.  Looking at the Tokenizer and HashingTF 
transformers I see these handy traits (HasInputCol, HasLabelCol, HasOutputCol, 
etc.) but they have strict access modifiers.  How can I use these with custom 
Transformer/Estimator implementations?

I’m stuck depositing my implementations in org.apache.spark.ml, which is 
tolerable for now, but I’m wondering if I’m missing some pattern?

Thanks,
mn
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark RDD join with CassandraRDD

2015-08-25 Thread Matt Narrell
I would suggest converting your RDDs to Dataframes (or SchemaRDDs depending on 
your version) and performing a native join.

mn

> On Aug 25, 2015, at 9:22 AM, Priya Ch  wrote:
> 
> Hi All, 
> 
>  I have the following scenario:
> 
>   There exists a booking table in cassandra, which holds the fields like, 
> bookingid, passengeName, contact etc etc.
> 
> Now in my spark streaming application, there is one class Booking which acts 
> as a container and holds all the field details -
> 
> class Booking
> {
>val bookingid =...
>val passengerName = ...
>val contact = ...
>.
>.
>.
>.
> }
> 
> when a new booking message comes in I populate the fields in the class which 
> create rdds of type RDD[Booking]. Now I have this rdd to cassandra table 
> Booking as rdd.saveToCassandra.
> 
> Lets say if I query on booking table I would get cassandraRDD[CassandraRow]
> If I want to join RDD[Booking] with this cassandraRDD...how is it 
> possible...as these are of two different rdds ?
> 
> converting CassandraRDD to RDD[CassandraRow] would make things work ?
> 
> Thanks,
> Padma Ch


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: laziness in textFile reading from HDFS?

2015-10-03 Thread Matt Narrell
Is there any more information or best practices here?  I have the exact same 
issues when reading large data sets from HDFS (larger than available RAM) and I 
cannot run without setting the RDD persistence level to MEMORY_AND_DISK_SER, 
and using nearly all the cluster resources.

Should I repartition this RDD to be equal to the number of cores?  

I notice that the job duration on the YARN UI is about 30 minutes longer than 
the Spark UI.  When the job initially starts, there is no tasks shown in the 
Spark UI..?

All I;m doing is reading records from HDFS text files with sc.textFile, and 
rewriting them back to HDFS grouped by a timestamp.

Thanks,
mn

> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
> 
> 1) It is not required to have the same amount of memory as data. 
> 2) By default the # of partitions are equal to the number of HDFS blocks
> 3) Yes, the read operation is lazy
> 4) It is okay to have more number of partitions than number of cores. 
> 
> Mohammed
> 
> -Original Message-
> From: davidkl [mailto:davidkl...@hotmail.com] 
> Sent: Monday, September 28, 2015 1:40 AM
> To: user@spark.apache.org
> Subject: laziness in textFile reading from HDFS?
> 
> Hello,
> 
> I need to process a significant amount of data every day, about 4TB. This 
> will be processed in batches of about 140GB. The cluster this will be running 
> on doesn't have enough memory to hold the dataset at once, so I am trying to 
> understand how this works internally.
> 
> When using textFile to read an HDFS folder (containing multiple files), I 
> understand that the number of partitions created are equal to the number of 
> HDFS blocks, correct? Are those created in a lazy way? I mean, if the number 
> of blocks/partitions is larger than the number of cores/threads the Spark 
> driver was launched with (N), are N partitions created initially and then the 
> rest when required? Or are all those partitions created up front?
> 
> I want to avoid reading the whole data into memory just to spill it out to 
> disk if there is no enough memory.
> 
> Thanks! 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFile-reading-from-HDFS-tp24837.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
> commands, e-mail: user-h...@spark.apache.org
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Matt Narrell
One.

I read in LZO compressed files from HDFS
Perform a map operation
cache the results of this map operation
call saveAsHadoopFile to write LZO back to HDFS.

Without the cache, the job will stall.  

mn

> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  wrote:
> 
> Is there any specific reason for caching the RDD? How many passes you make 
> over the dataset? 
> 
> Mohammed
> 
> -Original Message-----
> From: Matt Narrell [mailto:matt.narr...@gmail.com] 
> Sent: Saturday, October 3, 2015 9:50 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org
> Subject: Re: laziness in textFile reading from HDFS?
> 
> Is there any more information or best practices here?  I have the exact same 
> issues when reading large data sets from HDFS (larger than available RAM) and 
> I cannot run without setting the RDD persistence level to 
> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
> 
> Should I repartition this RDD to be equal to the number of cores?  
> 
> I notice that the job duration on the YARN UI is about 30 minutes longer than 
> the Spark UI.  When the job initially starts, there is no tasks shown in the 
> Spark UI..?
> 
> All I;m doing is reading records from HDFS text files with sc.textFile, and 
> rewriting them back to HDFS grouped by a timestamp.
> 
> Thanks,
> mn
> 
>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
>> 
>> 1) It is not required to have the same amount of memory as data. 
>> 2) By default the # of partitions are equal to the number of HDFS 
>> blocks
>> 3) Yes, the read operation is lazy
>> 4) It is okay to have more number of partitions than number of cores. 
>> 
>> Mohammed
>> 
>> -Original Message-
>> From: davidkl [mailto:davidkl...@hotmail.com]
>> Sent: Monday, September 28, 2015 1:40 AM
>> To: user@spark.apache.org
>> Subject: laziness in textFile reading from HDFS?
>> 
>> Hello,
>> 
>> I need to process a significant amount of data every day, about 4TB. This 
>> will be processed in batches of about 140GB. The cluster this will be 
>> running on doesn't have enough memory to hold the dataset at once, so I am 
>> trying to understand how this works internally.
>> 
>> When using textFile to read an HDFS folder (containing multiple files), I 
>> understand that the number of partitions created are equal to the number of 
>> HDFS blocks, correct? Are those created in a lazy way? I mean, if the number 
>> of blocks/partitions is larger than the number of cores/threads the Spark 
>> driver was launched with (N), are N partitions created initially and then 
>> the rest when required? Or are all those partitions created up front?
>> 
>> I want to avoid reading the whole data into memory just to spill it out to 
>> disk if there is no enough memory.
>> 
>> Thanks! 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFi
>> le-reading-from-HDFS-tp24837.html Sent from the Apache Spark User List 
>> mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>> additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>> additional commands, e-mail: user-h...@spark.apache.org
>> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Matt Narrell
Agreed. This is spark 1.2 on CDH5.x. How do you mitigate where the data sets 
are larger than available memory?

My jobs stall and gc/heap issues all over the place.  

..via mobile

> On Oct 6, 2015, at 4:44 PM, Mohammed Guller  wrote:
> 
> I have not used LZO compressed files from Spark, so not sure why it stalls 
> without caching. 
> 
> In general, if you are going to make just one pass over the data, there is 
> not much benefit in caching it. The data gets read anyway only after the 
> first action is called. If you are calling just a map operation and then a 
> save operation, I don't see how caching would help.
> 
> Mohammed
> 
> 
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com] 
> Sent: Tuesday, October 6, 2015 3:32 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org
> Subject: Re: laziness in textFile reading from HDFS?
> 
> One.
> 
> I read in LZO compressed files from HDFS Perform a map operation cache the 
> results of this map operation call saveAsHadoopFile to write LZO back to HDFS.
> 
> Without the cache, the job will stall.  
> 
> mn
> 
>> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  wrote:
>> 
>> Is there any specific reason for caching the RDD? How many passes you make 
>> over the dataset? 
>> 
>> Mohammed
>> 
>> -Original Message-
>> From: Matt Narrell [mailto:matt.narr...@gmail.com]
>> Sent: Saturday, October 3, 2015 9:50 PM
>> To: Mohammed Guller
>> Cc: davidkl; user@spark.apache.org
>> Subject: Re: laziness in textFile reading from HDFS?
>> 
>> Is there any more information or best practices here?  I have the exact same 
>> issues when reading large data sets from HDFS (larger than available RAM) 
>> and I cannot run without setting the RDD persistence level to 
>> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
>> 
>> Should I repartition this RDD to be equal to the number of cores?  
>> 
>> I notice that the job duration on the YARN UI is about 30 minutes longer 
>> than the Spark UI.  When the job initially starts, there is no tasks shown 
>> in the Spark UI..?
>> 
>> All I;m doing is reading records from HDFS text files with sc.textFile, and 
>> rewriting them back to HDFS grouped by a timestamp.
>> 
>> Thanks,
>> mn
>> 
>>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  wrote:
>>> 
>>> 1) It is not required to have the same amount of memory as data. 
>>> 2) By default the # of partitions are equal to the number of HDFS 
>>> blocks
>>> 3) Yes, the read operation is lazy
>>> 4) It is okay to have more number of partitions than number of cores. 
>>> 
>>> Mohammed
>>> 
>>> -Original Message-
>>> From: davidkl [mailto:davidkl...@hotmail.com]
>>> Sent: Monday, September 28, 2015 1:40 AM
>>> To: user@spark.apache.org
>>> Subject: laziness in textFile reading from HDFS?
>>> 
>>> Hello,
>>> 
>>> I need to process a significant amount of data every day, about 4TB. This 
>>> will be processed in batches of about 140GB. The cluster this will be 
>>> running on doesn't have enough memory to hold the dataset at once, so I am 
>>> trying to understand how this works internally.
>>> 
>>> When using textFile to read an HDFS folder (containing multiple files), I 
>>> understand that the number of partitions created are equal to the number of 
>>> HDFS blocks, correct? Are those created in a lazy way? I mean, if the 
>>> number of blocks/partitions is larger than the number of cores/threads the 
>>> Spark driver was launched with (N), are N partitions created initially and 
>>> then the rest when required? Or are all those partitions created up front?
>>> 
>>> I want to avoid reading the whole data into memory just to spill it out to 
>>> disk if there is no enough memory.
>>> 
>>> Thanks! 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textF
>>> i le-reading-from-HDFS-tp24837.html Sent from the Apache Spark User 
>>> List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>>> additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
>>> additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming on YARN with loss of application master

2015-03-30 Thread Matt Narrell

I’m looking at various HA scenarios with Spark streaming.  We’re currently 
running a Spark streaming job that is intended to be long-lived, 24/7.  We see 
that if we kill node managers that are hosting Spark workers, new node managers 
assume execution of the jobs that were running on the stopped node manager.  
However, if we stop the node manager that is hosting the application master, we 
found that the job is marked as FINISHED.  Is this expected behavior?  I 
assumed that the resource manager would resubmit or migrate the application 
master to another available node.  We’re in an isolated testing situation here, 
so we’re assured that the cluster has enough resources to sustain outages and 
resubmissions.

I’m reading that Hadoop MapReduce has this functionality built into it, so I 
would think that there is some equivalent Spark driver configuration to allow 
for YARN resource managers to reschedule Spark application masters should a job 
not respond within some threshold.

Thanks,
Matt
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming : what is the best way to make a driver highly available

2014-08-14 Thread Matt Narrell
I’d suggest something like Apache YARN, or Apache Mesos with Marathon or 
something similar to allow for management, in particular restart on failure.

mn

On Aug 13, 2014, at 7:15 PM, Tobias Pfeiffer  wrote:

> Hi,
> 
> On Thu, Aug 14, 2014 at 5:49 AM, salemi  wrote:
> what is the best way to make a spark streaming driver highly available.
> 
> I would also be interested in that. In particular for Streaming applications 
> where the Spark driver is running for a long time, this might be important, I 
> think.
> 
> Thanks
> Tobias
> 



spark-submit with HA YARN

2014-08-18 Thread Matt Narrell
Hello,

I have an HA enabled YARN cluster with two resource mangers.  When submitting 
jobs via “spark-submit —master yarn-cluster”.  It appears that the driver is 
looking explicitly for the "yarn.resourcemanager.address” property rather than 
round robin-ing through the resource managers via the 
“yarn.client.failover-proxy-provider” property set to 
“org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider”

If I explicitly set the “yarn.resourcemanager.address” to the active resource 
manager, jobs will submit fine. 

Is there a manner to set “spark-submit —master yarn-cluster” to respect the 
failover proxy?

Thanks in advance,
Matt
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-submit with HA YARN

2014-08-20 Thread Matt Narrell
Yes, I’m pretty sure my YARN and HDFS HA configuration is correct.  I can use 
the UIs and HDFS command line tools with HA support as expected (failing over 
namenodes and resourcemanagers, etc) so I believe this to be a Spark issue.

Like I mentioned earlier, if i manipulate the “yarn.resourcemanager.address” to 
reflect the active resource manager, things work as expected, although that 
would not be an HA setup…

An “unaccepted” reply to this thread from Dean Chen suggested to build Spark 
with a newer version of Hadoop (2.4.1) and this has worked to some extent.  I’m 
now able to submit jobs (omitting an explicit “yarn.resourcemanager.address” 
property) and the ConfiguredRMFailoverProxyProvider seems to submit this to the 
arbitrary, active resource manager.  Thanks Dean!

However, now the Spark jobs running in the ApplicationMaster on a given node 
fails to find the active resourcemanager.  Below is a log excerpt from one of 
the assigned nodes.  As all the jobs fail, eventually YARN will move this to 
execute on the node that co-locates the active resourcemanager and a 
nodemanager, where the job will proceed a bit further.  Then, the Spark job 
itself will fail attempting to access HDFS files via the virtualized HA HDFS 
URI.

I’m running Apache Spark 1.0.2 built against Hadoop 2.4.1.  Is it verified that 
Spark is ready for HA YARN/HDFS?

===
14/08/20 11:34:23 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
14/08/20 11:34:24 INFO ApplicationMaster: ApplicationAttemptId: 
appattempt_1408548063882_0002_01
14/08/20 11:34:24 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8030
14/08/20 11:34:24 INFO SecurityManager: Changing view acls to: hadoop
14/08/20 11:34:24 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop)
14/08/20 11:34:24 INFO ApplicationMaster: Starting the user JAR in a separate 
Thread
14/08/20 11:34:24 INFO ApplicationMaster: Waiting for Spark context 
initialization
14/08/20 11:34:24 INFO ApplicationMaster: Waiting for Spark context 
initialization ... 0
14/08/20 11:34:24 WARN SparkConf: In Spark 1.0 and later spark.local.dir will 
be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone and LOCAL_DIRS in YARN).
14/08/20 11:34:25 INFO Slf4jLogger: Slf4jLogger started
14/08/20 11:34:25 INFO Remoting: Starting remoting
14/08/20 11:34:25 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sp...@ip-10-0-5-106.us-west-2.compute.internal:41419]
14/08/20 11:34:25 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://sp...@ip-10-0-5-106.us-west-2.compute.internal:41419]
14/08/20 11:34:27 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
14/08/20 11:34:28 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 1 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
14/08/20 11:34:29 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 2 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
14/08/20 11:34:30 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 3 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
14/08/20 11:34:31 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 4 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
14/08/20 11:34:32 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 5 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
14/08/20 11:34:33 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 6 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
14/08/20 11:34:34 INFO Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8030. Already tried 7 time(s); retry policy is 
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)


On Aug 19, 2014, at 5:34 PM, Sandy Ryza  wrote:

> Hi Matt,
> 
> I checked in the YARN code and I don't see any references to 
> yarn.resourcemanager.address.  Have you made sure that your YARN client 
> configuration on the node you're launching from contains the right configs?
> 
> -Sandy  
> 
> 
> On Mon, Aug 18, 2014 at 4:07 PM, Matt Narrell  wrote:
> Hello,
> 
> I have an HA enabled YARN cluster with two resource mangers.  When submitting 
> jobs via “spark-submit —master yarn-cluster”.  It appe

Re: spark-submit with HA YARN

2014-08-20 Thread Matt Narrell
Marcelo,

Specifying the driver-class-path yields behavior like 
https://issues.apache.org/jira/browse/SPARK-2420 and 
https://issues.apache.org/jira/browse/SPARK-2848  It feels like opening a can 
of worms here if I also need to replace the guava dependencies.

Wouldn’t calling “./make-distribution.sh —skip-java-test —hadoop 2.4.1 
—with-yarn —tgz” include the appropriate versions of the hadoop libs into the 
spark jar?  

I’m trying to rebuild using the hadoop-provided profile, but I’m getting 
several build errors.  Is this sufficient:  mvm -Phadoop-provided clean package 
-Phadoop-2.4 -Pyarn -Dyarn.version=2.4.1 -Dhadoop.version=2.4.1 -DskipTests

Or am I missing something completely?  What is the time frame to have the above 
JIRA issues resolved?

mn

On Aug 20, 2014, at 11:25 AM, Marcelo Vanzin  wrote:

> Ah, sorry, forgot to talk about the second issue.
> 
> On Wed, Aug 20, 2014 at 8:54 AM, Matt Narrell  wrote:
>> However, now the Spark jobs running in the ApplicationMaster on a given node
>> fails to find the active resourcemanager.  Below is a log excerpt from one
>> of the assigned nodes.  As all the jobs fail, eventually YARN will move this
>> to execute on the node that co-locates the active resourcemanager and a
>> nodemanager, where the job will proceed a bit further.  Then, the Spark job
>> itself will fail attempting to access HDFS files via the virtualized HA HDFS
>> URI.
> 
>> 14/08/20 11:34:27 INFO Client: Retrying connect to server:
>> 0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); retry policy is
>> RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
>> MILLISECONDS)
> 
> Here it seems you're running into the same issue but on the AM side.
> If you're comfortable with using a custom build of Spark, I'd
> recommend building it with the "-Phadoop-provided" profile enabled
> (note: I think that only works with maven currently). That way the
> Spark assembly does not include the Hadoop classes.
> 
> A quick look at the code seems to indicate the Spark assembly is
> always added before the local Hadoop jars in the classpath.
> Unfortunately there's no workaround for that at the moment (aside from
> the above built-time fix), although we might be able to do something
> when SPARK-2848 is fixed.
> 
> -- 
> Marcelo



Re: spark-submit with HA YARN

2014-08-20 Thread Matt Narrell
Ok Marcelo,

Thanks for the quick and thorough replies.  I’ll keep an eye on these tickets 
and the mailing list to see how things move along.

mn

On Aug 20, 2014, at 1:33 PM, Marcelo Vanzin  wrote:

> Hi,
> 
> On Wed, Aug 20, 2014 at 11:59 AM, Matt Narrell  wrote:
>> Specifying the driver-class-path yields behavior like
>> https://issues.apache.org/jira/browse/SPARK-2420 and
>> https://issues.apache.org/jira/browse/SPARK-2848  It feels like opening a
>> can of worms here if I also need to replace the guava dependencies.
> 
> There's a PR open for the Guava dependency. I'm not sure at the moment
> if there will be a need to shade any other dependencies.
> 
>> Wouldn’t calling “./make-distribution.sh —skip-java-test —hadoop 2.4.1
>> —with-yarn —tgz” include the appropriate versions of the hadoop libs into
>> the spark jar?
> 
> Yes. I was suggesting a workaround that just excludes any Hadoop
> classes from your Spark jar; that way you're guaranteed to use
> whatever Hadoop classes you have in your cluster. :-)
> 
>> I’m trying to rebuild using the hadoop-provided profile, but I’m getting
>> several build errors.  Is this sufficient:  mvm -Phadoop-provided clean
>> package -Phadoop-2.4 -Pyarn -Dyarn.version=2.4.1 -Dhadoop.version=2.4.1
>> -DskipTests
> 
> Hmm, I'm pretty sure that's worked for me in the past, but then I
> haven't tried that in a while.
> 
> -- 
> Marcelo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on YARN question

2014-09-02 Thread Matt Narrell
I’ve put my Spark JAR into HDFS, and specify the SPARK_JAR variable to point to 
the HDFS location of the jar.  I’m not using any specialized configuration 
files (like spark-env.sh), but rather setting things either by environment 
variable per node, passing application arguments to the job, or making a 
Zookeeper connection from my job to seed properties.  From there, I can 
construct a SparkConf as necessary.

mn

On Sep 2, 2014, at 9:06 AM, Greg Hill  wrote:

> I'm working on setting up Spark on YARN using the HDP technical preview - 
> http://hortonworks.com/kb/spark-1-0-1-technical-preview-hdp-2-1-3/
> 
> I have installed the Spark JARs on all the slave nodes and configured YARN to 
> find the JARs.  It seems like everything is working.
> 
> Unless I'm misunderstanding, it seems like there isn't any configuration 
> required on the YARN slave nodes at all, apart from telling YARN where to 
> find the Spark JAR files.  Do the YARN processes even pick up local Spark 
> configuration files on the slave nodes, or is that all just pulled in on the 
> client and passed along to YARN?
> 
> Greg



Serialized 3rd party libs

2014-09-02 Thread Matt Narrell
Hello,

I’m using Spark streaming to aggregate data from a Kafka topic in sliding 
windows.  Usually we want to persist this aggregated data to a MongoDB cluster, 
or republish to a different Kafka topic.  When I include these 3rd party 
drivers, I usually get a NotSerializableException due to the parallelization of 
the job.  To side step this, I’ve used static class variables which seem to 
help, e.g., I can run my jobs.  

Is this the proper way to provide 3rd party libs to Spark jobs?  
Does having these drivers declared as static prohibit me from parallelizing my 
job?  
Is this even a proper way to design jobs?  

An alternative (I assume) would be to aggregate my data into HDFS and have 
another process (perhaps non-Spark?) consume it and republish/persist?

Thanks,
Matt
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serialized 3rd party libs

2014-09-02 Thread Matt Narrell
Sean,

Thanks for point this out.  I’d have to experiment with the mapPartitions 
method, but you’re right, this seems to address this issue directly.  I’m also 
connecting to Zookeeper to retrieve SparkConf parameters.  I run into the same 
issue with my Zookeeper driver, however, this is before any Spark contexts are 
created, and I assume before the job is partitioned.  

mn

On Sep 2, 2014, at 11:00 AM, Sean Owen  wrote:

> The problem is not using the drivers per se, but writing your
> functions in a way that you are trying to serialize them. You can't
> serialize them, and indeed don't want to.  Instead your code needs to
> reopen connections and so forth when the function is instantiated on
> the remote worker.
> 
> static variables are a crude way to do that, probably too crude in general.
> No, there's certainly no reason you can't access these things in Spark.
> 
> Since it answers exactly this point, I don't mind promoting today's blog post:
> http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
> ... which repeats Tobias's good formulation of how to deal with things
> like drivers in an efficient way that doesn't trip over serialization.
> 
> On Tue, Sep 2, 2014 at 5:45 PM, Matt Narrell  wrote:
>> Hello,
>> 
>> I’m using Spark streaming to aggregate data from a Kafka topic in sliding 
>> windows.  Usually we want to persist this aggregated data to a MongoDB 
>> cluster, or republish to a different Kafka topic.  When I include these 3rd 
>> party drivers, I usually get a NotSerializableException due to the 
>> parallelization of the job.  To side step this, I’ve used static class 
>> variables which seem to help, e.g., I can run my jobs.
>> 
>> Is this the proper way to provide 3rd party libs to Spark jobs?
>> Does having these drivers declared as static prohibit me from parallelizing 
>> my job?
>> Is this even a proper way to design jobs?
>> 
>> An alternative (I assume) would be to aggregate my data into HDFS and have 
>> another process (perhaps non-Spark?) consume it and republish/persist?
>> 
>> Thanks,
>> Matt
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow

2014-09-08 Thread Matt Narrell
I came across this:  https://github.com/xerial/sbt-pack

Until i found this, I was simply using the sbt-assembly plugin (sbt clean 
assembly)

mn

On Sep 4, 2014, at 2:46 PM, Aris  wrote:

> Thanks for answering Daniil - 
> 
> I have SBT version 0.13.5, is that an old version? Seems pretty up-to-date.
> 
> It turns out I figured out a way around this entire problem: just use 'sbt 
> package', and when using bin/spark-submit, pass it the "--jars" option and 
> GIVE IT ALL THE JARS from the local iv2 cache. Pretty inelegant, but at least 
> I am able to develop, and when I want to make a super JAR with sbt assembly I 
> can use the stupidly slow method.
> 
> Here is the important snippet for grabbing all the JARs for the local cache 
> of ivy2 :
> 
>  --jars $(find ~/.ivy2/cache/ -iname *.jar | tr '\n' ,) 
> 
> Here's the entire running command  - 
> 
> bin/spark-submit --master local[*] --jars $(find /home/data/.ivy2/cache/ 
> -iname *.jar | tr '\n' ,) --class KafkaStreamConsumer 
> ~/code_host/data/scala/streamingKafka/target/scala-2.10/streamingkafka_2.10-1.0.jar
>  node1:2181 my-consumer-group aris-topic 1
> 
> This is fairly bad, but it works around sbt assembly being incredibly slow
> 
> 
> On Tue, Sep 2, 2014 at 2:13 PM, Daniil Osipov  
> wrote:
> What version of sbt are you using? There is a bug in early version of 0.13 
> that causes assembly to be extremely slow - make sure you're using the latest 
> one.
> 
> 
> On Fri, Aug 29, 2014 at 1:30 PM, Aris <> wrote:
> Hi folks,
> 
> I am trying to use Kafka with Spark Streaming, and it appears I cannot do the 
> normal 'sbt package' as I do with other Spark applications, such as Spark 
> alone or Spark with MLlib. I learned I have to build with the sbt-assembly 
> plugin.
> 
> OK, so here is my build.sbt file for my extremely simple test Kafka/Spark 
> Streaming project. It Takes almost 30 minutes to build! This is a Centos 
> Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To 
> compare, sbt assembly for the entire Spark project itself takes less than 10 
> minutes.
> 
> At the bottom of this file I am trying to play with 'cacheOutput' options, 
> because I read online that maybe I am calculating SHA-1 for all the *.class 
> files in this super JAR. 
> 
> I also copied the mergeStrategy from Spark contributor TD Spark Streaming 
> tutorial from Spark Summit 2014.
> 
> Again, is there some better way to build this JAR file, just using sbt 
> package? This is process is working, but very slow.
> 
> Any help with speeding up this compilation is really appreciated!!
> 
> Aris
> 
> -
> 
> import AssemblyKeys._ // put this at the top of the file
> 
> name := "streamingKafka"
> 
> version := "1.0"
> 
> scalaVersion := "2.10.4"
> 
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.1" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.1" % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % "1.0.1"
> )
> 
> assemblySettings
> 
> jarName in assembly := "streamingkafka-assembly.jar"
> 
> mergeStrategy in assembly := {
>   case m if m.toLowerCase.endsWith("manifest.mf")  => 
> MergeStrategy.discard
>   case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => 
> MergeStrategy.discard
>   case "log4j.properties"  => 
> MergeStrategy.discard
>   case m if m.toLowerCase.startsWith("meta-inf/services/") => 
> MergeStrategy.filterDistinctLines
>   case "reference.conf"=> 
> MergeStrategy.concat
>   case _   => 
> MergeStrategy.first
> }
> 
> assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
> 
> 
> 



Multiple Kafka Receivers and Union

2014-09-23 Thread Matt Narrell
Hey,

Spark 1.1.0
Kafka 0.8.1.1
Hadoop (YARN/HDFS) 2.5.1

I have a five partition Kafka topic.  I can create a single Kafka receiver via 
KafkaUtils.createStream with five threads in the topic map and consume messages 
fine.  Sifting through the user list and Google, I see that its possible to 
split the Kafka receiver among the Spark workers such that I can have a 
receiver per topic, and have this distributed to workers rather than localized 
to the driver.  I’m following something like this:  
https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
  But for Kafka obviously.  From the Streaming Programming Guide “ Receiving 
multiple data streams can therefore be achieved by creating multiple input 
DStreams and configuring them to receive different partitions of the data 
stream from the source(s)."

However, I’m not able to consume any messages from Kafka after I perform the 
union operation.  Again, if I create a single, multi-threaded, receiver I can 
consume messages fine.  If I create 5 receivers in a loop, and call 
jssc.union(…) i get:

INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks

Do I need to do anything to the unioned DStream?  Am I going about this 
incorrectly?

Thanks in advance.

Matt

Re: Multiple Kafka Receivers and Union

2014-09-23 Thread Matt Narrell
So, this is scrubbed some for confidentiality, but the meat of it is as 
follows.  Note, that if I substitute the commented section for the loop, I 
receive messages from the topic.

SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.streaming.unpersist", "true");
sparkConf.set("spark.logConf", "true");

Map kafkaProps = new HashMap<>();
kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
kafkaProps.put("group.id", groupId);

JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
Seconds.apply(1));
jsc.checkpoint("hdfs://");

List> streamList = new ArrayList<>(5);

for (int i = 0; i < 5; i++) {
streamList.add(KafkaUtils.createStream(jsc,
   String.class, ProtobufModel.class,
   StringDecoder.class, 
ProtobufModelDecoder.class,
   kafkaProps,
   Collections.singletonMap(topic, 1),
   StorageLevel.MEMORY_ONLY_SER()));
}

final JavaPairDStream stream = 
jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));

//  final JavaPairReceiverInputDStream stream =
//  KafkaUtils.createStream(jsc,
//  String.class, ProtobufModel.class,
//  StringDecoder.class, 
ProtobufModelDecoder.class,
//  kafkaProps,
//  Collections.singletonMap(topic, 5),
//  StorageLevel.MEMORY_ONLY_SER());

final JavaPairDStream tuples = stream.mapToPair(
new PairFunction, String, Integer>() {
@Override
public Tuple2 call(Tuple2 
tuple) throws Exception {
return new Tuple2<>(tuple._2().getDeviceId(), 1);
}
});

… and futher Spark functions ...

On Sep 23, 2014, at 2:55 PM, Tim Smith  wrote:

> Posting your code would be really helpful in figuring out gotchas.
> 
> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell  wrote:
>> Hey,
>> 
>> Spark 1.1.0
>> Kafka 0.8.1.1
>> Hadoop (YARN/HDFS) 2.5.1
>> 
>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>> via KafkaUtils.createStream with five threads in the topic map and consume
>> messages fine.  Sifting through the user list and Google, I see that its
>> possible to split the Kafka receiver among the Spark workers such that I can
>> have a receiver per topic, and have this distributed to workers rather than
>> localized to the driver.  I’m following something like this:
>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>> multiple data streams can therefore be achieved by creating multiple input
>> DStreams and configuring them to receive different partitions of the data
>> stream from the source(s)."
>> 
>> However, I’m not able to consume any messages from Kafka after I perform the
>> union operation.  Again, if I create a single, multi-threaded, receiver I
>> can consume messages fine.  If I create 5 receivers in a loop, and call
>> jssc.union(…) i get:
>> 
>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>> 
>> 
>> Do I need to do anything to the unioned DStream?  Am I going about this
>> incorrectly?
>> 
>> Thanks in advance.
>> 
>> Matt
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple Kafka Receivers and Union

2014-09-23 Thread Matt Narrell
To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but 
this may cause waves for me upstream (e.g., non-Java)

Thanks for looking at this.  If anyone else can see a glaring issue in the Java 
approach that would be appreciated.

Thanks,
Matt

On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:

> Sorry, I am almost Java illiterate but here's my Scala code to do the
> equivalent (that I have tested to work):
> 
> val kInStreams = (1 to 10).map{_ =>
> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
> across the cluster, one for each partition, potentially but active
> receivers are only as many kafka partitions you have
> 
> val kInMsg = 
> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
> 
> 
> 
> 
> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  wrote:
>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>> follows.  Note, that if I substitute the commented section for the loop, I 
>> receive messages from the topic.
>> 
>> SparkConf sparkConf = new SparkConf();
>> sparkConf.set("spark.streaming.unpersist", "true");
>> sparkConf.set("spark.logConf", "true");
>> 
>> Map kafkaProps = new HashMap<>();
>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>> kafkaProps.put("group.id", groupId);
>> 
>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>> Seconds.apply(1));
>> jsc.checkpoint("hdfs://");
>> 
>> List> streamList = new ArrayList<>(5);
>> 
>> for (int i = 0; i < 5; i++) {
>>streamList.add(KafkaUtils.createStream(jsc,
>>   String.class, ProtobufModel.class,
>>   StringDecoder.class, 
>> ProtobufModelDecoder.class,
>>   kafkaProps,
>>   Collections.singletonMap(topic, 1),
>>   StorageLevel.MEMORY_ONLY_SER()));
>> }
>> 
>> final JavaPairDStream stream = 
>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>> 
>> //  final JavaPairReceiverInputDStream stream =
>> //  KafkaUtils.createStream(jsc,
>> //  String.class, 
>> ProtobufModel.class,
>> //  StringDecoder.class, 
>> ProtobufModelDecoder.class,
>> //  kafkaProps,
>> //  Collections.singletonMap(topic, 
>> 5),
>> //  StorageLevel.MEMORY_ONLY_SER());
>> 
>> final JavaPairDStream tuples = stream.mapToPair(
>>new PairFunction, String, Integer>() {
>>@Override
>>public Tuple2 call(Tuple2 
>> tuple) throws Exception {
>>return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>}
>>});
>> 
>> … and futher Spark functions ...
>> 
>> On Sep 23, 2014, at 2:55 PM, Tim Smith  wrote:
>> 
>>> Posting your code would be really helpful in figuring out gotchas.
>>> 
>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell  
>>> wrote:
>>>> Hey,
>>>> 
>>>> Spark 1.1.0
>>>> Kafka 0.8.1.1
>>>> Hadoop (YARN/HDFS) 2.5.1
>>>> 
>>>> I have a five partition Kafka topic.  I can create a single Kafka receiver
>>>> via KafkaUtils.createStream with five threads in the topic map and consume
>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>> possible to split the Kafka receiver among the Spark workers such that I 
>>>> can
>>>> have a receiver per topic, and have this distributed to workers rather than
>>>> localized to the driver.  I’m following something like this:
>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>> But for Kafka obviously.  From the Streaming Programming Guide “ Receiving
>>>> multiple data streams can therefore be achieved by creating multiple input
>>>> DStreams and configuring them to receive different partitions of the data
>>>&g

Re: Multiple Kafka Receivers and Union

2014-09-24 Thread Matt Narrell
The part that works is the commented out, single receiver stream below the 
loop.  It seems that when I call KafkaUtils.createStream more than once, I 
don’t receive any messages.

I’ll dig through the logs, but at first glance yesterday I didn’t see anything 
suspect.  I’ll have to look closer.

mn

On Sep 23, 2014, at 6:14 PM, Tim Smith  wrote:

> Maybe post the before-code as in what was the code before you did the
> loop (that worked)? I had similar situations where reviewing code
> before (worked) and after (does not work) helped. Also, what helped is
> the Scala REPL because I can see what are the object types being
> returned by each statement.
> 
> Other than code, in the driver logs, you should see events that say
> "Registered receiver for stream 0 from
> akka.tcp://sp...@node5.acme.net:53135"
> 
> Now, if you goto "node5" and look at Spark or YarnContainer logs
> (depending on who's doing RM), you should be able to see if the
> receiver has any errors when trying to talk to kafka.
> 
> 
> 
> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell  wrote:
>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
>> but this may cause waves for me upstream (e.g., non-Java)
>> 
>> Thanks for looking at this.  If anyone else can see a glaring issue in the 
>> Java approach that would be appreciated.
>> 
>> Thanks,
>> Matt
>> 
>> On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:
>> 
>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>> equivalent (that I have tested to work):
>>> 
>>> val kInStreams = (1 to 10).map{_ =>
>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>> across the cluster, one for each partition, potentially but active
>>> receivers are only as many kafka partitions you have
>>> 
>>> val kInMsg = 
>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  
>>> wrote:
>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>> follows.  Note, that if I substitute the commented section for the loop, I 
>>>> receive messages from the topic.
>>>> 
>>>> SparkConf sparkConf = new SparkConf();
>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>> sparkConf.set("spark.logConf", "true");
>>>> 
>>>> Map kafkaProps = new HashMap<>();
>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>> kafkaProps.put("group.id", groupId);
>>>> 
>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>> Seconds.apply(1));
>>>> jsc.checkpoint("hdfs://");
>>>> 
>>>> List> streamList = new 
>>>> ArrayList<>(5);
>>>> 
>>>> for (int i = 0; i < 5; i++) {
>>>>   streamList.add(KafkaUtils.createStream(jsc,
>>>>  String.class, ProtobufModel.class,
>>>>  StringDecoder.class, 
>>>> ProtobufModelDecoder.class,
>>>>  kafkaProps,
>>>>  Collections.singletonMap(topic, 
>>>> 1),
>>>>  StorageLevel.MEMORY_ONLY_SER()));
>>>> }
>>>> 
>>>> final JavaPairDStream stream = 
>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>> 
>>>> //  final JavaPairReceiverInputDStream stream =
>>>> //  KafkaUtils.createStream(jsc,
>>>> //  String.class, 
>>>> ProtobufModel.class,
>>>> //      StringDecoder.class, 
>>>> ProtobufModelDecoder.class,
>>>> //  kafkaProps,
>>>> //  
>>>> Collections.singletonMap(topic, 5),
>>>> //  
>>>> StorageLevel.MEMORY_ONLY_SER());
>>>> 
>>>> final JavaPairDStream tuples = stream.mapToPair(
>>>>   new PairFun

Re: Does Spark Driver works with HDFS in HA mode

2014-09-24 Thread Matt Narrell
Yes, this works.  Make sure you have HADOOP_CONF_DIR set on your Spark machines

mn

On Sep 24, 2014, at 5:35 AM, Petr Novak  wrote:

> Hello,
> if our Hadoop cluster is configured with HA and "fs.defaultFS" points to a 
> namespace instead of a namenode hostname - hdfs:/// - then 
> our Spark job fails with exception. Is there anything to configure or it is 
> not implemented?
> 
> 
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: 
> Lost task 0.3 in stage 1.0 (TID 4, ): 
> 
> 
> 
> java.lang.IllegalArgumentException: java.net.UnknownHostException: 
> 
> 
> 
> 
> Many thanks,
> P.
> 



Re: Spark with YARN

2014-09-24 Thread Matt Narrell
This just shows the driver.  Click the Executors tab in the Spark UI

mn

On Sep 24, 2014, at 11:25 AM, Raghuveer Chanda  
wrote:

> Hi,
> 
> I'm new to spark and facing problem with running a job in cluster using YARN.
> 
> Initially i ran jobs using spark master as --master spark://dml2:7077 and it 
> is running fine on 3 workers.
> 
> But now im shifting to YARN, so installed YARN in cloud era on 3 node cluster 
> and changed the master to yarn-cluster but it is not working I attached the 
> screenshots of UI which are not progressing and just hanging on.
> 
> Output on terminal :
> 
> This error is repeating 
> 
> ./spark-submit --class "class-name" --master yarn-cluster --num-executors 3 
> --executor-cores 3  jar-with-dependencies.jar
> 
> 
> Do i need to configure YARN or why it is not getting all the workers .. 
> please help ...
> 
> 
> 14/09/24 22:44:21 INFO yarn.Client: Application report from ASM: 
>application identifier: application_1411578463780_0001
>appId: 1
>clientToAMToken: null
>appDiagnostics: 
>appMasterHost: dml3
>appQueue: root.chanda
>appMasterRpcPort: 0
>appStartTime: 1411578513545
>yarnAppState: RUNNING
>distributedFinalState: UNDEFINED
>appTrackingUrl: http://dml2:8088/proxy/application_1411578463780_0001/
>appUser: chanda
> 14/09/24 22:44:22 INFO yarn.Client: Application report from ASM: 
>application identifier: application_1411578463780_0001
>appId: 1
>clientToAMToken: null
>appDiagnostics: 
>appMasterHost: dml3
>appQueue: root.chanda
>appMasterRpcPort: 0
>appStartTime: 1411578513545
>yarnAppState: RUNNING
>distributedFinalState: UNDEFINED
>appTrackingUrl: http://dml2:8088/proxy/application_1411578463780_0001/
> 
> 
> 
> 
> -- 
> Regards,
> Raghuveer Chanda
> 4th year Undergraduate Student
> Computer Science and Engineering
> IIT Kharagpur
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Matt Narrell
I suppose I have other problems as I can’t get the Scala example to work 
either.  Puzzling, as I have literally coded like the examples (that are 
purported to work), but no luck.

mn

On Sep 24, 2014, at 11:27 AM, Tim Smith  wrote:

> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
> 
> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell  wrote:
>> The part that works is the commented out, single receiver stream below the 
>> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
>> don’t receive any messages.
>> 
>> I’ll dig through the logs, but at first glance yesterday I didn’t see 
>> anything suspect.  I’ll have to look closer.
>> 
>> mn
>> 
>> On Sep 23, 2014, at 6:14 PM, Tim Smith  wrote:
>> 
>>> Maybe post the before-code as in what was the code before you did the
>>> loop (that worked)? I had similar situations where reviewing code
>>> before (worked) and after (does not work) helped. Also, what helped is
>>> the Scala REPL because I can see what are the object types being
>>> returned by each statement.
>>> 
>>> Other than code, in the driver logs, you should see events that say
>>> "Registered receiver for stream 0 from
>>> akka.tcp://sp...@node5.acme.net:53135"
>>> 
>>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>>> (depending on who's doing RM), you should be able to see if the
>>> receiver has any errors when trying to talk to kafka.
>>> 
>>> 
>>> 
>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell  
>>> wrote:
>>>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
>>>> but this may cause waves for me upstream (e.g., non-Java)
>>>> 
>>>> Thanks for looking at this.  If anyone else can see a glaring issue in the 
>>>> Java approach that would be appreciated.
>>>> 
>>>> Thanks,
>>>> Matt
>>>> 
>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:
>>>> 
>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>>> equivalent (that I have tested to work):
>>>>> 
>>>>> val kInStreams = (1 to 10).map{_ =>
>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>>> across the cluster, one for each partition, potentially but active
>>>>> receivers are only as many kafka partitions you have
>>>>> 
>>>>> val kInMsg = 
>>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  
>>>>> wrote:
>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>>>> follows.  Note, that if I substitute the commented section for the loop, 
>>>>>> I receive messages from the topic.
>>>>>> 
>>>>>> SparkConf sparkConf = new SparkConf();
>>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>>> sparkConf.set("spark.logConf", "true");
>>>>>> 
>>>>>> Map kafkaProps = new HashMap<>();
>>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>>> kafkaProps.put("group.id", groupId);
>>>>>> 
>>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>>>> Seconds.apply(1));
>>>>>> jsc.checkpoint("hdfs://");
>>>>>> 
>>>>>> List> streamList = new 
>>>>>> ArrayList<>(5);
>>>>>> 
>>>>>> for (int i = 0; i < 5; i++) {
>>>>>>  streamList.add(KafkaUtils.createStream(jsc,
>>>>>> String.class, 
>>>>>> ProtobufModel.class,
>>>>>> StringDecoder.class, 
>>>>>> ProtobufModelDecoder.class,
>>>>>> kafkaProps,
>>>>>> Collections.singletonMap(topic, 
>>&g

Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Matt Narrell
Tim,

I think I understand this now.  I had a five node Spark cluster and a five 
partition topic, and I created five receivers.  I found this:  
http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
 Indicating that if I use all my workers as receivers, there are none left to 
do the processing.  If I drop the number of partitions/receivers down while 
still having multiple unioned receivers, I see messages.

mn

On Sep 25, 2014, at 10:18 AM, Matt Narrell  wrote:

> I suppose I have other problems as I can’t get the Scala example to work 
> either.  Puzzling, as I have literally coded like the examples (that are 
> purported to work), but no luck.
> 
> mn
> 
> On Sep 24, 2014, at 11:27 AM, Tim Smith  wrote:
> 
>> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>> 
>> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell  wrote:
>>> The part that works is the commented out, single receiver stream below the 
>>> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
>>> don’t receive any messages.
>>> 
>>> I’ll dig through the logs, but at first glance yesterday I didn’t see 
>>> anything suspect.  I’ll have to look closer.
>>> 
>>> mn
>>> 
>>> On Sep 23, 2014, at 6:14 PM, Tim Smith  wrote:
>>> 
>>>> Maybe post the before-code as in what was the code before you did the
>>>> loop (that worked)? I had similar situations where reviewing code
>>>> before (worked) and after (does not work) helped. Also, what helped is
>>>> the Scala REPL because I can see what are the object types being
>>>> returned by each statement.
>>>> 
>>>> Other than code, in the driver logs, you should see events that say
>>>> "Registered receiver for stream 0 from
>>>> akka.tcp://sp...@node5.acme.net:53135"
>>>> 
>>>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>>>> (depending on who's doing RM), you should be able to see if the
>>>> receiver has any errors when trying to talk to kafka.
>>>> 
>>>> 
>>>> 
>>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell  
>>>> wrote:
>>>>> To my eyes, these are functionally equivalent.  I’ll try a Scala 
>>>>> approach, but this may cause waves for me upstream (e.g., non-Java)
>>>>> 
>>>>> Thanks for looking at this.  If anyone else can see a glaring issue in 
>>>>> the Java approach that would be appreciated.
>>>>> 
>>>>> Thanks,
>>>>> Matt
>>>>> 
>>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:
>>>>> 
>>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>>>> equivalent (that I have tested to work):
>>>>>> 
>>>>>> val kInStreams = (1 to 10).map{_ =>
>>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>>>> across the cluster, one for each partition, potentially but active
>>>>>> receivers are only as many kafka partitions you have
>>>>>> 
>>>>>> val kInMsg = 
>>>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  
>>>>>> wrote:
>>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>>>>> follows.  Note, that if I substitute the commented section for the 
>>>>>>> loop, I receive messages from the topic.
>>>>>>> 
>>>>>>> SparkConf sparkConf = new SparkConf();
>>>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>>>> sparkConf.set("spark.logConf", "true");
>>>>>>> 
>>>>>>> Map kafkaProps = new HashMap<>();
>>>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>>>> kafkaProps.put("group.id", groupId);
>>>>>>> 
>>>>>>> JavaStreamingC

Re: Multiple Kafka Receivers and Union

2014-09-25 Thread Matt Narrell
Additionally,

If I dial up/down the number of executor cores, this does what I want.  Thanks 
for the extra eyes!

mn

On Sep 25, 2014, at 12:34 PM, Matt Narrell  wrote:

> Tim,
> 
> I think I understand this now.  I had a five node Spark cluster and a five 
> partition topic, and I created five receivers.  I found this:  
> http://stackoverflow.com/questions/25785581/custom-receiver-stalls-worker-in-spark-streaming
>  Indicating that if I use all my workers as receivers, there are none left to 
> do the processing.  If I drop the number of partitions/receivers down while 
> still having multiple unioned receivers, I see messages.
> 
> mn
> 
> On Sep 25, 2014, at 10:18 AM, Matt Narrell  wrote:
> 
>> I suppose I have other problems as I can’t get the Scala example to work 
>> either.  Puzzling, as I have literally coded like the examples (that are 
>> purported to work), but no luck.
>> 
>> mn
>> 
>> On Sep 24, 2014, at 11:27 AM, Tim Smith  wrote:
>> 
>>> Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
>>> 
>>> On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell  
>>> wrote:
>>>> The part that works is the commented out, single receiver stream below the 
>>>> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
>>>> don’t receive any messages.
>>>> 
>>>> I’ll dig through the logs, but at first glance yesterday I didn’t see 
>>>> anything suspect.  I’ll have to look closer.
>>>> 
>>>> mn
>>>> 
>>>> On Sep 23, 2014, at 6:14 PM, Tim Smith  wrote:
>>>> 
>>>>> Maybe post the before-code as in what was the code before you did the
>>>>> loop (that worked)? I had similar situations where reviewing code
>>>>> before (worked) and after (does not work) helped. Also, what helped is
>>>>> the Scala REPL because I can see what are the object types being
>>>>> returned by each statement.
>>>>> 
>>>>> Other than code, in the driver logs, you should see events that say
>>>>> "Registered receiver for stream 0 from
>>>>> akka.tcp://sp...@node5.acme.net:53135"
>>>>> 
>>>>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>>>>> (depending on who's doing RM), you should be able to see if the
>>>>> receiver has any errors when trying to talk to kafka.
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell  
>>>>> wrote:
>>>>>> To my eyes, these are functionally equivalent.  I’ll try a Scala 
>>>>>> approach, but this may cause waves for me upstream (e.g., non-Java)
>>>>>> 
>>>>>> Thanks for looking at this.  If anyone else can see a glaring issue in 
>>>>>> the Java approach that would be appreciated.
>>>>>> 
>>>>>> Thanks,
>>>>>> Matt
>>>>>> 
>>>>>> On Sep 23, 2014, at 4:13 PM, Tim Smith  wrote:
>>>>>> 
>>>>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>>>>> equivalent (that I have tested to work):
>>>>>>> 
>>>>>>> val kInStreams = (1 to 10).map{_ =>
>>>>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>>>>> across the cluster, one for each partition, potentially but active
>>>>>>> receivers are only as many kafka partitions you have
>>>>>>> 
>>>>>>> val kInMsg = 
>>>>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell  
>>>>>>> wrote:
>>>>>>>> So, this is scrubbed some for confidentiality, but the meat of it is 
>>>>>>>> as follows.  Note, that if I substitute the commented section for the 
>>>>>>>> loop, I receive messages from the topic.
>>>>>>>> 
>>>>>>>> SparkConf sparkConf = new SparkConf();
>>>>>>>> sparkConf.set("spark.str

Re: SPARK UI - Details post job processiong

2014-09-25 Thread Matt Narrell
How does this work with a cluster manager like YARN?

mn

On Sep 25, 2014, at 2:23 PM, Andrew Or  wrote:

> Hi Harsha,
> 
> You can turn on `spark.eventLog.enabled` as documented here: 
> http://spark.apache.org/docs/latest/monitoring.html. Then, if you are running 
> standalone mode, you can access the finished SparkUI through the Master UI. 
> Otherwise, you can start a HistoryServer to display finished UIs.
> 
> -Andrew
> 
> 2014-09-25 12:55 GMT-07:00 Harsha HN <99harsha.h@gmail.com>:
> Hi,
> 
> Details laid out in Spark UI for the job in progress is really interesting 
> and very useful. 
> But this gets vanished once the job is done. 
> Is there a way to get job details post processing? 
> 
> Looking for Spark UI data, not standard input,output and error info.
> 
> Thanks,
> Harsha
> 



Re: SPARK UI - Details post job processiong

2014-09-26 Thread Matt Narrell
Yes, I’m running Hadoop’s Timeline server that does this for the YARN/Hadoop 
logs (and works very nicely btw).  Are you saying I can do the same for the 
SparkUI as well?  Also, where do I set these Spark configurations since this 
will be executed inside a YARN container?  On the “client” machine via 
spark-env.sh?  Do I pass these as command line arguments to spark-submit?  Do I 
set them explicitly on my SparkConf?

Thanks in advance.

mn

On Sep 25, 2014, at 9:13 PM, Andrew Ash  wrote:

> Matt you should be able to set an HDFS path so you'll get logs written to a 
> unified place instead of to local disk on a random box on the cluster.
> 
> On Thu, Sep 25, 2014 at 1:38 PM, Matt Narrell  wrote:
> How does this work with a cluster manager like YARN?
> 
> mn
> 
> On Sep 25, 2014, at 2:23 PM, Andrew Or  wrote:
> 
>> Hi Harsha,
>> 
>> You can turn on `spark.eventLog.enabled` as documented here: 
>> http://spark.apache.org/docs/latest/monitoring.html. Then, if you are 
>> running standalone mode, you can access the finished SparkUI through the 
>> Master UI. Otherwise, you can start a HistoryServer to display finished UIs.
>> 
>> -Andrew
>> 
>> 2014-09-25 12:55 GMT-07:00 Harsha HN <99harsha.h@gmail.com>:
>> Hi,
>> 
>> Details laid out in Spark UI for the job in progress is really interesting 
>> and very useful. 
>> But this gets vanished once the job is done. 
>> Is there a way to get job details post processing? 
>> 
>> Looking for Spark UI data, not standard input,output and error info.
>> 
>> Thanks,
>> Harsha
>> 
> 
> 



Re: Transforming the Dstream vs transforming each RDDs in the Dstream.

2014-10-20 Thread Matt Narrell
http://spark.apache.org/docs/latest/streaming-programming-guide.html 


foreachRDD is executed on the driver….

mn

> On Oct 20, 2014, at 3:07 AM, Gerard Maas  wrote:
> 
> Pinging TD  -- I'm sure you know :-)
> 
> -kr, Gerard.
> 
> On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas  > wrote:
> Hi,
> 
> We have been implementing several Spark Streaming jobs that are basically 
> processing data and inserting it into Cassandra, sorting it among different 
> keyspaces.
> 
> We've been following the pattern:
> 
> dstream.foreachRDD(rdd => 
> val records = rdd.map(elem => record(elem))
> targets.foreach(target => records.filter{record => 
> isTarget(target,record)}.writeToCassandra(target,table))
> )
> 
> I've been wondering whether there would be a performance difference in 
> transforming the dstream instead of transforming the RDD within the dstream 
> with regards to how the transformations get scheduled.
> 
> Instead of the RDD-centric computation, I could transform the dstream until 
> the last step, where I need an rdd to store.
> For example, the  previous  transformation could be written as:
> 
> val recordStream = dstream.map(elem => record(elem))
> targets.foreach{target => recordStream.filter(record => 
> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}
> 
> Would  be a difference in execution and/or performance?  What would be the 
> preferred way to do this?
> 
> Bonus question: Is there a better (more performant) way to sort the data in 
> different "buckets" instead of filtering the data collection times the 
> #buckets?
> 
> thanks,  Gerard.
> 
> 



Re: Scala Spark IDE help

2014-10-28 Thread Matt Narrell
So, Im using Intellij 13.x, and Scala Spark jobs.

Make sure you have singletons (objects, not classes), then simply debug the 
main function.  You’ll need to set your master to some derivation of “local”, 
but thats it.  Spark Streaming is kinda wonky when debugging, but data-at-rest 
behaves like you’d expect.

hth

mn

> On Oct 27, 2014, at 3:03 PM, Eric Tanner  wrote:
> 
> I am a Scala / Spark newbie (attending Paco Nathan's class).  
> 
> What I need is some advice as to how to set up intellij (or eclipse) to be 
> able to attache to the process executing to the debugger.  I know that this 
> is not feasible if the code is executing within the cluster.  However, if 
> spark is running locally (on my laptop) I would like to attach the debugger 
> process to the spark program that is running locally to be able to step 
> through the program.
> 
> Any advice will be is helpful.
> 
> Eric
> 
> -- 
> 
> 
> 
> 
> Eric Tanner
> Big Data Developer
> 
> 
> 
> 15440 Laguna Canyon, Suite 100
> 
> Irvine, CA 92618
> 
>  
> Cell:
> Tel:
> Skype:
> Web:
> 
>   +1 (951) 313-9274 <>
>   +1 (949) 706-0400 <> 
>   e ric.tanner.je 
>   www.justenough.com 
> Confidentiality Note: The information contained in this email and document(s) 
> attached are for the exclusive use of the addressee and may contain 
> confidential, privileged and non-disclosable information. If the recipient of 
> this email is not the addressee, such recipient is strictly prohibited from 
> reading, photocopying, distribution or otherwise using this email or its 
> contents in any way.
> 



Re: Spark to eliminate full-table scan latency

2014-10-28 Thread Matt Narrell
I’ve been puzzled by this lately.  I too would like to use the thrift server to 
provide JDBC style access to datasets via SparkSQL.  Is this possible?  The 
examples show temp tables created during the lifetime of a SparkContext.  I 
assume I can use SparkSQL to query those tables while the context is active, 
but what happens when the context is stopped?  I can no longer query this 
table, via the thrift server.  Do I need Hive in this scenario?  I don’t want 
to rebuild the Spark distribution unless absolutely necessary.

From the examples, it looks like SparkSQL is syntax sugar for manipulating an 
RDD, but if I need external access to this data, I need a separate store, 
outside of Spark (Mongo/Cassandra/HDFS/etc..)  Am I correct here?

Thanks,

mn

> On Oct 27, 2014, at 7:43 PM, Ron Ayoub  wrote:
> 
> This does look like it provides a good way to allow other process to access 
> the contents of an RDD in a separate app? Is there any other general purpose 
> mechanism for serving up RDD data? I understand that the driver app and 
> workers all are app specific and run in separate executors but would be cool 
> if there was some general way to create a server app based on Spark. Perhaps 
> Spark SQL is that general way and I'll soon find out. Thanks. 
> 
> From: mich...@databricks.com
> Date: Mon, 27 Oct 2014 14:35:46 -0700
> Subject: Re: Spark to eliminate full-table scan latency
> To: ronalday...@live.com
> CC: user@spark.apache.org
> 
> You can access cached data in spark through the JDBC server:
> 
> http://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbc-server
>  
> 
> 
> On Mon, Oct 27, 2014 at 1:47 PM, Ron Ayoub  > wrote:
> We have a table containing 25 features per item id along with feature 
> weights. A correlation matrix can be constructed for every feature pair based 
> on co-occurrence. If a user inputs a feature they can find out the features 
> that are correlated with a self-join requiring a single full table scan. This 
> results in high latency for big data (10 seconds +) due to the IO involved in 
> the full table scan. My idea is for this feature the data can be loaded into 
> an RDD and transformations and actions can be applied to find out per query 
> what are the correlated features. 
> 
> I'm pretty sure Spark can do this sort of thing. Since I'm new, what I'm not 
> sure about is, is Spark appropriate as a server application? For instance, 
> the drive application would have to load the RDD and then listen for request 
> and return results, perhaps using a socket?  Are there any libraries to 
> facilitate this sort of Spark server app? So I understand how Spark can be 
> used to grab data, run algorithms, and put results back but is it appropriate 
> as the engine of a server app and what are the general patterns involved?



Re: Submiting Spark application through code

2014-10-28 Thread Matt Narrell
Can this be done?  Can I just spin up a SparkContext programmatically, point 
this to my yarn-cluster and this works like spark-submit??  Doesn’t (at least) 
the application JAR need to be distributed to the workers via HDFS or the like 
for the jobs to run?

mn

> On Oct 28, 2014, at 2:29 AM, Akhil Das  wrote:
> 
> How about directly running it?
> 
> val ssc = new StreamingContext("local[2]","Network WordCount",Seconds(5),
>   "/home/akhld/mobi/localclusterxx/spark-1")
> 
> 
> val lines=ssc.socketTextStream("localhost", 12345)
> 
> val words = lines.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
> wordCounts.print()
> 
> ssc.start()
> ssc.awaitTermination()
> 
> Thanks
> Best Regards
> 
> On Tue, Oct 28, 2014 at 1:50 PM, sivarani  > wrote:
> Hi,
> 
> i am submitting spark application in the following fashion
> 
> bin/spark-submit --class "NetworkCount" --master spark://abc.test.com:7077 
> 
> try/simple-project/target/simple-project-1.0-jar-with-dependencies.jar
> 
> But is there any other way to submit spark application through the code?
> 
> like for example i am checking for a condition if true i wanted to run the
> spark application
> 
> (isConditionTrue){
>runSpark("NetworkCount","masterurl","jar")
> }
> 
> I am aware we can set the jar and master url with spark context, but how to
> run it from code automatically when a condition comes true without actually
> using spark-submit
> 
> Is it possible?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Submiting-Spark-application-through-code-tp17452.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: spark-submit and logging

2014-11-20 Thread Matt Narrell
How do I configure the files to be uploaded to YARN containers.  So far, I’ve 
only seen "--conf spark.yarn.jar=hdfs://….” which allows me to specify the HDFS 
location of the Spark JAR, but I’m not sure how to prescribe other files for 
uploading (e.g., spark-env.sh)

mn

> On Nov 20, 2014, at 4:08 AM, Sean Owen  wrote:
> 
> I think the standard practice is to include your log config file among
> the files uploaded to YARN containers, and then set
> -Dlog4j.configuration=yourfile.xml in
> spark.{executor.driver}.extraJavaOptions ?
> 
> http://spark.apache.org/docs/latest/running-on-yarn.html
> 
> On Thu, Nov 20, 2014 at 9:20 AM, Tobias Pfeiffer  wrote:
>> Hi,
>> 
>> I am using spark-submit to submit my application jar to a YARN cluster.  I
>> want to deliver a single jar file to my users, so I would like to avoid to
>> tell them "also, please put that log4j.xml file somewhere and add that path
>> to the spark-submit command".
>> 
>> I thought it would be sufficient that my application jar file contains a
>> log4j.xml file, but that does not seem to be the case.  If I don't add a
>> log4j.xml file to the classpath before launching spark-submit, the one
>> bundled with spark will be used -- which has a negative influence on my
>> program execution.  Is there any way I can tell spark-submit to use the
>> log4j configuration bundled in my jar file?
>> 
>> Thanks
>> Tobias
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Execute Spark programs from local machine on Yarn-hadoop cluster

2014-11-23 Thread Matt Narrell
I think this IS possible?

You must set the HADOOP_CONF_DIR variable on the machine you’re running the 
Java process that creates the SparkContext.  The Hadoop configuration specifies 
the YARN ResourceManager IPs, and Spark will use that configuration.  

mn

> On Nov 21, 2014, at 8:10 AM, Prannoy  wrote:
> 
> Hi naveen, 
> 
> I dont think this is possible. If you are setting the master with your 
> cluster details you cannot execute any job from your local machine. You have 
> to execute the jobs inside your yarn machine so that sparkconf is able to 
> connect with all the provided details. 
> 
> If this is not the case such give a detail explaintation of what exactly you 
> are trying to do :)
> 
> Thanks.
> 
> On Fri, Nov 21, 2014 at 8:11 PM, Naveen Kumar Pokala [via Apache Spark User 
> List] <[hidden email] 
> > wrote:
> Hi,
> 
>  
> 
> I am executing my spark jobs on yarn cluster by forming conf object in the 
> following way.
> 
>  
> 
> SparkConf conf = new 
> SparkConf().setAppName("NewJob").setMaster("yarn-cluster");
> 
>  
> 
> Now I want to execute spark jobs from my local machine how to do that.
> 
>  
> 
> What I mean is there a way to give IP address, port all the details to 
> connect a master(YARN) on some other network from my local spark Program.
> 
>  
> 
> -Naveen
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-Spark-programs-from-local-machine-on-Yarn-hadoop-cluster-tp19482.html
>  
> 
> To start a new topic under Apache Spark User List, email [hidden email] 
>  
> To unsubscribe from Apache Spark User List, click here 
> .
> NAML 
> 
> 
> View this message in context: Re: Execute Spark programs from local machine 
> on Yarn-hadoop cluster 
> 
> Sent from the Apache Spark User List mailing list archive 
>  at Nabble.com.



Re: Spark Job submit

2014-12-01 Thread Matt Narrell
Or setting the HADOOP_CONF_DIR property.  Either way, you must have the YARN 
configuration available to the submitting application to allow for the use of 
“yarn-client” or “yarn-master”

The attached stack trace below doesn’t provide any information as to why the 
job failed.

mn

> On Nov 27, 2014, at 12:14 AM, Akhil Das  wrote:
> 
> Try to add your cluster's core-site.xml, yarn-site.xml, and hdfs-site.xml to 
> the CLASSPATH (and on SPARK_CLASSPATH) and submit the job.
> 
> Thanks
> Best Regards
> 
> On Thu, Nov 27, 2014 at 12:24 PM, Naveen Kumar Pokala 
> mailto:npok...@spcapitaliq.com>> wrote:
> Code is in my windows machine and cluster is in some other network in UNIX. 
> In this case how it will identify the cluster. In case of spark cluster we 
> can clearly specify the URL like spark://ip:port. But in case of hadoop how 
> to specify that.
> 
>  
> 
> What I have done is copied the hadoop configuration files from network to 
> local and created dummy hadoop directory(in windows machine).
> 
>  
> 
> Submitted from spark submit by adding above dummy files location with 
> HADOOP_CONF_DIR variable.  Attaching the error.
> 
>  
> 
>  
> 
> 
> 
>  
> 
> Please suggest me how to proceed from the code and how to execute from spark 
> submit from windows machine.
> 
>  
> 
> Please provide me sample code if you have any.
> 
>  
> 
> -Naveen
> 
>  
> 
> From: Akhil Das [mailto:ak...@sigmoidanalytics.com 
> ] 
> Sent: Wednesday, November 26, 2014 10:03 PM
> To: Naveen Kumar Pokala
> Cc: user@spark.apache.org 
> Subject: Re: Spark Job submit
> 
>  
> 
> How about?
> 
>  
> 
> - Create a SparkContext 
> 
> - setMaster as yarn-cluster
> 
> - Create a JavaSparkContext with the above SparkContext
> 
>  
> 
> And that will submit it to the yarn cluster.
> 
> 
> 
> Thanks
> 
> Best Regards
> 
>  
> 
> On Wed, Nov 26, 2014 at 4:20 PM, Naveen Kumar Pokala  > wrote:
> 
> Hi.
> 
>  
> 
> Is there a way to submit spark job on Hadoop-YARN  cluster from java code.
> 
>  
> 
> -Naveen
> 
>  
> 
> 



Re: Is there a way to force spark to use specific ips?

2014-12-06 Thread Matt Narrell
Its much easier if you access your nodes by name.  If you’re using Vagrant, use 
the hosts provisioner:  https://github.com/adrienthebo/vagrant-hosts 


mn

> On Dec 6, 2014, at 8:37 AM, Ashic Mahtab  wrote:
> 
> Hi,
> It appears that spark is always attempting to use the driver's hostname to 
> connect / broadcast. This is usually fine, except when the cluster doesn't 
> have DNS configured. For example, in a vagrant cluster with a private 
> network. The workers and masters, and the host (where the driver runs from) 
> can all see each other by ip. I can also specify --conf 
> "spark.driver.host=192.168.40.1", and that results in the workers being able 
> to connect to the driver. However, when trying to broadcast anything, it's 
> still trying to use the hostname of the host. Now, I can set up a host entry 
> in etc/hosts, but was wondering if there's a way to not require the hassle. 
> Is there any way I can force spark to always use ips and not hostnames?
> 
> Thanks,
> Ashic.