Re: Correct SparkLauncher usage

2016-11-12 Thread Elkhan Dadashov
Hey Mohammad,

I implemented the code using CountDownLatch, and SparkLauncher works as
expected. Hope it helps.

Whenever appHandle.getState() reaching one of The Final states, then
countDownLatch is decreased, and execution returns back to main program.


...final CountDownLatch countDownLatch = new CountDownLatch(1);

SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);

SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);

Thread sparkAppListenerThread = new Thread(sparkAppListener);


// More generic way of the above 3 lines in the below 3 commented lines

// SparkAppHandle.Listener sparkAppListener = new
SparkAppListener(countDownLatch);// use (Factory kind of) getter
method

// SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);

// Thread sparkAppListenerThread = new Thread((Runnable)sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);
...

private static class SparkAppListener implements
SparkAppHandle.Listener, Runnable {
private static final Log log =
LogFactory.getLog(SparkAppListener.class);
private final CountDownLatch countDownLatch;
public SparkAppListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void stateChanged(SparkAppHandle handle) {
String sparkAppId = handle.getAppId();
State appState = handle.getState();
if (sparkAppId != null) {
log.info("Spark job with app id: " + sparkAppId + ",\t
State changed to: " + appState + " - "
+ SPARK_STATE_MSG.get(appState));
} else {
log.info("Spark job's state changed to: " + appState +
" - " + SPARK_STATE_MSG.get(appState));
}
if (appState != null && appState.isFinal()) {
countDownLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {}
@Override
public void run() {}
}


On Thu, Nov 10, 2016 at 3:08 PM Mohammad Tariq  wrote:

> Sure, will look into the tests.
>
> Thanks you so much for your time!
>
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Nov 11, 2016 at 4:35 AM, Marcelo Vanzin 
> wrote:
>
> Sorry, it's kinda hard to give any more feedback from just the info you
> provided.
>
> I'd start with some working code like this from Spark's own unit tests:
>
> https://github.com/apache/spark/blob/a8ea4da8d04c1ed621a96668118f20739145edd2/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala#L164
>
>
> On Thu, Nov 10, 2016 at 3:00 PM, Mohammad Tariq 
> wrote:
>
> All I want to do is submit a job, and keep on getting states as soon as it
> changes, and come out once the job is over. I'm sorry to be a pest of
> questions. Kind of having a bit of tough time making this work.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq 
> wrote:
>
> Yeah, that definitely makes sense. I was just trying to make it work
> somehow. The problem is that it's not at all calling the listeners, hence
> i'm unable to do anything. Just wanted to cross check it by looping inside.
> But I get the point. thank you for that!
>
> I'm on YARN(cluster mode).
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin 
> wrote:
>
> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
> wrote:
> >   @Override
> >   public void stateChanged(SparkAppHandle handle) {
> > System.out.println("Spark App Id [" + handle.getAppId() + "]. State
> [" + handle.getState() + "]");
> > while(!handle.getState().isFinal()) {
>
> You shouldn't loop in an event handler. That's not really how
> listeners work. Instead, use the event handler to update some local
> state, or signal some thread that's waiting for the state change.
>
> Also be aware that handles currently only work in local and yarn
> modes; the state updates haven't been hooked up to standalone mode
> (maybe for client mode, but definitely not cluster) nor mesos.
>
> --
> Marcelo
>
>

Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
Sure, will look into the tests.

Thanks you so much for your time!


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Nov 11, 2016 at 4:35 AM, Marcelo Vanzin  wrote:

> Sorry, it's kinda hard to give any more feedback from just the info you
> provided.
>
> I'd start with some working code like this from Spark's own unit tests:
> https://github.com/apache/spark/blob/a8ea4da8d04c1ed621a96668118f20
> 739145edd2/yarn/src/test/scala/org/apache/spark/deploy/
> yarn/YarnClusterSuite.scala#L164
>
>
> On Thu, Nov 10, 2016 at 3:00 PM, Mohammad Tariq 
> wrote:
>
>> All I want to do is submit a job, and keep on getting states as soon as
>> it changes, and come out once the job is over. I'm sorry to be a pest of
>> questions. Kind of having a bit of tough time making this work.
>>
>>
>> [image: --]
>>
>> Tariq, Mohammad
>> [image: https://]about.me/mti
>>
>> 
>>
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq 
>> wrote:
>>
>>> Yeah, that definitely makes sense. I was just trying to make it work
>>> somehow. The problem is that it's not at all calling the listeners, hence
>>> i'm unable to do anything. Just wanted to cross check it by looping inside.
>>> But I get the point. thank you for that!
>>>
>>> I'm on YARN(cluster mode).
>>>
>>>
>>> [image: --]
>>>
>>> Tariq, Mohammad
>>> [image: https://]about.me/mti
>>>
>>> 
>>>
>>>
>>>
>>>
>>> [image: http://]
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> 
>>>
>>>
>>> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin 
>>> wrote:
>>>
 On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
 wrote:
 >   @Override
 >   public void stateChanged(SparkAppHandle handle) {
 > System.out.println("Spark App Id [" + handle.getAppId() + "].
 State [" + handle.getState() + "]");
 > while(!handle.getState().isFinal()) {

 You shouldn't loop in an event handler. That's not really how
 listeners work. Instead, use the event handler to update some local
 state, or signal some thread that's waiting for the state change.

 Also be aware that handles currently only work in local and yarn
 modes; the state updates haven't been hooked up to standalone mode
 (maybe for client mode, but definitely not cluster) nor mesos.

 --
 Marcelo

>>>
>>>
>>
>
>
> --
> Marcelo
>


Re: Correct SparkLauncher usage

2016-11-10 Thread Marcelo Vanzin
Sorry, it's kinda hard to give any more feedback from just the info you
provided.

I'd start with some working code like this from Spark's own unit tests:
https://github.com/apache/spark/blob/a8ea4da8d04c1ed621a96668118f20739145edd2/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala#L164


On Thu, Nov 10, 2016 at 3:00 PM, Mohammad Tariq  wrote:

> All I want to do is submit a job, and keep on getting states as soon as it
> changes, and come out once the job is over. I'm sorry to be a pest of
> questions. Kind of having a bit of tough time making this work.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq 
> wrote:
>
>> Yeah, that definitely makes sense. I was just trying to make it work
>> somehow. The problem is that it's not at all calling the listeners, hence
>> i'm unable to do anything. Just wanted to cross check it by looping inside.
>> But I get the point. thank you for that!
>>
>> I'm on YARN(cluster mode).
>>
>>
>> [image: --]
>>
>> Tariq, Mohammad
>> [image: https://]about.me/mti
>>
>> 
>>
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin 
>> wrote:
>>
>>> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
>>> wrote:
>>> >   @Override
>>> >   public void stateChanged(SparkAppHandle handle) {
>>> > System.out.println("Spark App Id [" + handle.getAppId() + "].
>>> State [" + handle.getState() + "]");
>>> > while(!handle.getState().isFinal()) {
>>>
>>> You shouldn't loop in an event handler. That's not really how
>>> listeners work. Instead, use the event handler to update some local
>>> state, or signal some thread that's waiting for the state change.
>>>
>>> Also be aware that handles currently only work in local and yarn
>>> modes; the state updates haven't been hooked up to standalone mode
>>> (maybe for client mode, but definitely not cluster) nor mesos.
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>


-- 
Marcelo


Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
All I want to do is submit a job, and keep on getting states as soon as it
changes, and come out once the job is over. I'm sorry to be a pest of
questions. Kind of having a bit of tough time making this work.


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq  wrote:

> Yeah, that definitely makes sense. I was just trying to make it work
> somehow. The problem is that it's not at all calling the listeners, hence
> i'm unable to do anything. Just wanted to cross check it by looping inside.
> But I get the point. thank you for that!
>
> I'm on YARN(cluster mode).
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin 
> wrote:
>
>> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
>> wrote:
>> >   @Override
>> >   public void stateChanged(SparkAppHandle handle) {
>> > System.out.println("Spark App Id [" + handle.getAppId() + "]. State
>> [" + handle.getState() + "]");
>> > while(!handle.getState().isFinal()) {
>>
>> You shouldn't loop in an event handler. That's not really how
>> listeners work. Instead, use the event handler to update some local
>> state, or signal some thread that's waiting for the state change.
>>
>> Also be aware that handles currently only work in local and yarn
>> modes; the state updates haven't been hooked up to standalone mode
>> (maybe for client mode, but definitely not cluster) nor mesos.
>>
>> --
>> Marcelo
>>
>
>


Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
Yeah, that definitely makes sense. I was just trying to make it work
somehow. The problem is that it's not at all calling the listeners, hence
i'm unable to do anything. Just wanted to cross check it by looping inside.
But I get the point. thank you for that!

I'm on YARN(cluster mode).


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin  wrote:

> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
> wrote:
> >   @Override
> >   public void stateChanged(SparkAppHandle handle) {
> > System.out.println("Spark App Id [" + handle.getAppId() + "]. State
> [" + handle.getState() + "]");
> > while(!handle.getState().isFinal()) {
>
> You shouldn't loop in an event handler. That's not really how
> listeners work. Instead, use the event handler to update some local
> state, or signal some thread that's waiting for the state change.
>
> Also be aware that handles currently only work in local and yarn
> modes; the state updates haven't been hooked up to standalone mode
> (maybe for client mode, but definitely not cluster) nor mesos.
>
> --
> Marcelo
>


Re: Correct SparkLauncher usage

2016-11-10 Thread Marcelo Vanzin
On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq  wrote:
>   @Override
>   public void stateChanged(SparkAppHandle handle) {
> System.out.println("Spark App Id [" + handle.getAppId() + "]. State [" + 
> handle.getState() + "]");
> while(!handle.getState().isFinal()) {

You shouldn't loop in an event handler. That's not really how
listeners work. Instead, use the event handler to update some local
state, or signal some thread that's waiting for the state change.

Also be aware that handles currently only work in local and yarn
modes; the state updates haven't been hooked up to standalone mode
(maybe for client mode, but definitely not cluster) nor mesos.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
Hi Marcelo,

After a few changes I got it working. However I could not understand one
thing. I need to call Thread.sleep() and then get the state explicitly in
order to make it work.

Also, no matter what I do my launcher program doesn't call stateChanged()
or infoChanged(). Here is my code :

public class RMLauncher implements SparkAppHandle.Listener {

  public static void main(String[] args) {

Map map = new HashMap<>();
map.put("HADOOP_CONF_DIR", "/etc/hadoop/conf");
map.put("KRB5CCNAME", "/tmp/sparkjob");
map.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
launchSparkJob(map);
  }

  public static void launchSparkJob(Map map) {
SparkAppHandle handle = null;
try {
  handle = new SparkLauncher(map).startApplication();
} catch (IOException e) {
  e.printStackTrace();
}
  }

  @Override
  public void stateChanged(SparkAppHandle handle) {
System.out.println("Spark App Id [" + handle.getAppId() + "]. State ["
+ handle.getState() + "]");
while(!handle.getState().isFinal()) {
  System.out.println(" state is not final yet");
  System.out.println(" sleeping for a second");
  try {
Thread.sleep(1000L);
  } catch (InterruptedException e) {
  }
}
  }

  @Override
  public void infoChanged(SparkAppHandle handle) {
System.out.println("Spark App Id [" + handle.getAppId() + "] State
Changed. State [" + handle.getState() + "]");
  }
}

I have set all the required properties and I'm able to submit and run spark
jobs successfully. Any pointers would be really helpful.

Thanks again!


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Tue, Nov 8, 2016 at 5:16 AM, Marcelo Vanzin  wrote:

> Then you need to look at your logs to figure out why the child app is not
> working. "startApplication" will by default redirect the child's output to
> the parent's logs.
>
> On Mon, Nov 7, 2016 at 3:42 PM, Mohammad Tariq  wrote:
>
>> Hi Marcelo,
>>
>> Thank you for the prompt response. I tried adding listeners as well,
>> didn't work either. Looks like it isn't starting the job at all.
>>
>>
>> [image: --]
>>
>> Tariq, Mohammad
>> [image: https://]about.me/mti
>>
>> 
>>
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Tue, Nov 8, 2016 at 5:06 AM, Marcelo Vanzin 
>> wrote:
>>
>>> On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq 
>>> wrote:
>>> > I have been trying to use SparkLauncher.startApplication() to launch
>>> a Spark app from within java code, but unable to do so. However, same piece
>>> of code is working if I use SparkLauncher.launch().
>>> >
>>> > Here are the corresponding code snippets :
>>> >
>>> > SparkAppHandle handle = new SparkLauncher()
>>> >
>>> > .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/sp
>>> ark-1.6.1-bin-hadoop2.6")
>>> >
>>> > .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92
>>> .jdk/Contents/Home")
>>> >
>>> > .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.
>>> myorg.WC").setMaster("local")
>>> >
>>> > .setConf("spark.dynamicAllocation.enabled",
>>> "true").startApplication();System.out.println(handle.getAppId());
>>> >
>>> > System.out.println(handle.getState());
>>> >
>>> > This prints null and UNKNOWN as output.
>>>
>>> The information you're printing is not available immediately after you
>>> call "startApplication()". The Spark app is still starting, so it may
>>> take some time for the app ID and other info to be reported back. The
>>> "startApplication()" method allows you to provide listeners you can
>>> use to know when that information is available.
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>
>
> --
> Marcelo
>


Re: Correct SparkLauncher usage

2016-11-07 Thread Marcelo Vanzin
Then you need to look at your logs to figure out why the child app is not
working. "startApplication" will by default redirect the child's output to
the parent's logs.

On Mon, Nov 7, 2016 at 3:42 PM, Mohammad Tariq  wrote:

> Hi Marcelo,
>
> Thank you for the prompt response. I tried adding listeners as well,
> didn't work either. Looks like it isn't starting the job at all.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Tue, Nov 8, 2016 at 5:06 AM, Marcelo Vanzin 
> wrote:
>
>> On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq 
>> wrote:
>> > I have been trying to use SparkLauncher.startApplication() to launch a
>> Spark app from within java code, but unable to do so. However, same piece
>> of code is working if I use SparkLauncher.launch().
>> >
>> > Here are the corresponding code snippets :
>> >
>> > SparkAppHandle handle = new SparkLauncher()
>> >
>> > .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/
>> spark-1.6.1-bin-hadoop2.6")
>> >
>> > .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92
>> .jdk/Contents/Home")
>> >
>> > .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.
>> myorg.WC").setMaster("local")
>> >
>> > .setConf("spark.dynamicAllocation.enabled",
>> "true").startApplication();System.out.println(handle.getAppId());
>> >
>> > System.out.println(handle.getState());
>> >
>> > This prints null and UNKNOWN as output.
>>
>> The information you're printing is not available immediately after you
>> call "startApplication()". The Spark app is still starting, so it may
>> take some time for the app ID and other info to be reported back. The
>> "startApplication()" method allows you to provide listeners you can
>> use to know when that information is available.
>>
>> --
>> Marcelo
>>
>
>


-- 
Marcelo


Re: Correct SparkLauncher usage

2016-11-07 Thread Mohammad Tariq
Hi Marcelo,

Thank you for the prompt response. I tried adding listeners as well, didn't
work either. Looks like it isn't starting the job at all.


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Tue, Nov 8, 2016 at 5:06 AM, Marcelo Vanzin  wrote:

> On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq  wrote:
> > I have been trying to use SparkLauncher.startApplication() to launch a
> Spark app from within java code, but unable to do so. However, same piece
> of code is working if I use SparkLauncher.launch().
> >
> > Here are the corresponding code snippets :
> >
> > SparkAppHandle handle = new SparkLauncher()
> >
> > .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/
> UNPACKED/spark-1.6.1-bin-hadoop2.6")
> >
> > .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92
> .jdk/Contents/Home")
> >
> > .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.
> myorg.WC").setMaster("local")
> >
> > .setConf("spark.dynamicAllocation.enabled",
> "true").startApplication();System.out.println(handle.getAppId());
> >
> > System.out.println(handle.getState());
> >
> > This prints null and UNKNOWN as output.
>
> The information you're printing is not available immediately after you
> call "startApplication()". The Spark app is still starting, so it may
> take some time for the app ID and other info to be reported back. The
> "startApplication()" method allows you to provide listeners you can
> use to know when that information is available.
>
> --
> Marcelo
>


Re: Correct SparkLauncher usage

2016-11-07 Thread Marcelo Vanzin
On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq  wrote:
> I have been trying to use SparkLauncher.startApplication() to launch a Spark 
> app from within java code, but unable to do so. However, same piece of code 
> is working if I use SparkLauncher.launch().
>
> Here are the corresponding code snippets :
>
> SparkAppHandle handle = new SparkLauncher()
>
> 
> .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/spark-1.6.1-bin-hadoop2.6")
>
> 
> .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home")
>
> 
> .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.myorg.WC").setMaster("local")
>
> .setConf("spark.dynamicAllocation.enabled", 
> "true").startApplication();System.out.println(handle.getAppId());
>
> System.out.println(handle.getState());
>
> This prints null and UNKNOWN as output.

The information you're printing is not available immediately after you
call "startApplication()". The Spark app is still starting, so it may
take some time for the app ID and other info to be reported back. The
"startApplication()" method allows you to provide listeners you can
use to know when that information is available.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org