Hey Mohammad,

I implemented the code using CountDownLatch, and SparkLauncher works as
expected. Hope it helps.

Whenever appHandle.getState() reaching one of The Final states, then
countDownLatch is decreased, and execution returns back to main program.


...final CountDownLatch countDownLatch = new CountDownLatch(1);

SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);

SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);

Thread sparkAppListenerThread = new Thread(sparkAppListener);


// More generic way of the above 3 lines in the below 3 commented lines

// SparkAppHandle.Listener sparkAppListener = new
SparkAppListener(countDownLatch);// use (Factory kind of) getter
method

// SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);

// Thread sparkAppListenerThread = new Thread((Runnable)sparkAppListener);
Thread sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);
...

    private static class SparkAppListener implements
SparkAppHandle.Listener, Runnable {
        private static final Log log =
LogFactory.getLog(SparkAppListener.class);
        private final CountDownLatch countDownLatch;
        public SparkAppListener(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void stateChanged(SparkAppHandle handle) {
            String sparkAppId = handle.getAppId();
            State appState = handle.getState();
            if (sparkAppId != null) {
                log.info("Spark job with app id: " + sparkAppId + ",\t
State changed to: " + appState + " - "
                        + SPARK_STATE_MSG.get(appState));
            } else {
                log.info("Spark job's state changed to: " + appState +
" - " + SPARK_STATE_MSG.get(appState));
            }
            if (appState != null && appState.isFinal()) {
                countDownLatch.countDown();
            }
        }
        @Override
        public void infoChanged(SparkAppHandle handle) {}
        @Override
        public void run() {}
    }


On Thu, Nov 10, 2016 at 3:08 PM Mohammad Tariq <donta...@gmail.com> wrote:

> Sure, will look into the tests.
>
> Thanks you so much for your time!
>
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> <https://about.me/mti?promo=email_sig&utm_source=email_sig&utm_medium=external_link&utm_campaign=chrome_ext>
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
> On Fri, Nov 11, 2016 at 4:35 AM, Marcelo Vanzin <van...@cloudera.com>
> wrote:
>
> Sorry, it's kinda hard to give any more feedback from just the info you
> provided.
>
> I'd start with some working code like this from Spark's own unit tests:
>
> https://github.com/apache/spark/blob/a8ea4da8d04c1ed621a96668118f20739145edd2/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala#L164
>
>
> On Thu, Nov 10, 2016 at 3:00 PM, Mohammad Tariq <donta...@gmail.com>
> wrote:
>
> All I want to do is submit a job, and keep on getting states as soon as it
> changes, and come out once the job is over. I'm sorry to be a pest of
> questions. Kind of having a bit of tough time making this work.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> <https://about.me/mti?promo=email_sig&utm_source=email_sig&utm_medium=external_link&utm_campaign=chrome_ext>
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
> On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq <donta...@gmail.com>
> wrote:
>
> Yeah, that definitely makes sense. I was just trying to make it work
> somehow. The problem is that it's not at all calling the listeners, hence
> i'm unable to do anything. Just wanted to cross check it by looping inside.
> But I get the point. thank you for that!
>
> I'm on YARN(cluster mode).
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> <https://about.me/mti?promo=email_sig&utm_source=email_sig&utm_medium=external_link&utm_campaign=chrome_ext>
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin <van...@cloudera.com>
> wrote:
>
> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq <donta...@gmail.com>
> wrote:
> >   @Override
> >   public void stateChanged(SparkAppHandle handle) {
> >     System.out.println("Spark App Id [" + handle.getAppId() + "]. State
> [" + handle.getState() + "]");
> >     while(!handle.getState().isFinal()) {
>
> You shouldn't loop in an event handler. That's not really how
> listeners work. Instead, use the event handler to update some local
> state, or signal some thread that's waiting for the state change.
>
> Also be aware that handles currently only work in local and yarn
> modes; the state updates haven't been hooked up to standalone mode
> (maybe for client mode, but definitely not cluster) nor mesos.
>
> --
> Marcelo
>
>
>
>
>
>
> --
> Marcelo
>
>
>

Reply via email to