I run Spark in yarn-cluster mode. and yes , log aggregation enabled. In
Yarn aggregated logs i can the job status correctly.

The issue is Yarn Client logs (which is written to stdout in terminal)
states that job has succeeded even though the job has failed.

As user is not testing if Yarn RM successfully cleared all resources, but
the Spark job final status.

If Spark job fails then Spark Job server does not give any useful
information about job failure.

As Marcelo stated it is bug in Spark Python jobs (PythonRunner.scala
maybe). I've created bug report - SPARK-9416
<https://issues.apache.org/jira/browse/SPARK-9416>.

@Marcelo
*Question1*:
Do you know why launching Spark job through SparkLauncher in Java, stdout
logs (i.e., INFO Yarn.Client) are written into error stream
(spark.getErrorStream()) instead of output stream ?

Output stream (spark.getInputStream()) is always empty.

Process spark = new
SparkLauncher().setSparkHome("/home/edadashov/tools/myspark/spark")

.setAppName(sparkScriptName).setMaster("yarn-cluster")

.setAppResource(sparkScriptPath.toString()).addAppArgs(params)
                            .addPyFile(sparkScriptPath.toString())

.addPyFile(dependencyPackagePath.toString()).launch();

Then i need to clear/read the buffered streams, in order the process not to
enter deadlock. According to Oracle doc
<https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getOutputStream-->
:

"By default, the created subprocess does not have its own terminal or
console. All its standard I/O (i.e. stdin, stdout, stderr) operations will
be redirected to the parent process, where they can be accessed via the
streams obtained using the methodsgetOutputStream(), getInputStream(), and
getErrorStream(). The parent process uses these streams to feed input to
and get output from the subprocess. Because some native platforms only
provide limited buffer size for standard input and output streams, failure
to promptly write the input stream or read the output stream of the
subprocess may cause the subprocess to block, or even deadlock."

*Question2*:

What is the best way to know about Spark job progress & final status in
Java ?

Thanks.



On Tue, Jul 28, 2015 at 1:17 PM, Corey Nolet <cjno...@gmail.com> wrote:

>
>
> On Tue, Jul 28, 2015 at 2:17 PM, Elkhan Dadashov <elkhan8...@gmail.com>
> wrote:
>
>> Thanks Corey for your answer,
>>
>> Do you mean that "final status : SUCCEEDED" in terminal logs means that
>> YARN RM could clean the resources after the application has finished
>> (application finishing does not necessarily mean succeeded or failed) ?
>>
>> Correct.
>
>
>> With that logic it totally makes sense.
>>
>> Basically the YARN logs does not say anything about the Spark job itself.
>> It just says that Spark job resources have been cleaned up after the job
>> completed and returned back to Yarn.
>>
>
> If you have log aggregation enabled of your cluster, the "yarn log"
> command should give you any exceptions that were thrown in the driver /
> executors when you are running in yarn cluster mode. If you were running in
> yarn-client mode, you'd see the errors that caused a job to fail in your
> local log (errors that would cause a job to fail will be caught by the
> SparkContext on the driver) because the driver is running locally instead
> of being deployed in a yarn container. Also, using the Spark HistoryServer
> will give you a more visual insight into the exact problems (like which
> partitions failed, which executors died trying to process them, etc...)
>
>
>>
>> It would be great if Yarn logs could also say about the consequence of
>> the job, because the user is interested in more about the job final status.
>>
>
> This is just an artifact of running with yarn-cluster mode. It's still
> easy enough to run the "yarn log" command to see all the logs (you can grep
> for the node designated as the application master to find any exceptions in
> your driver that may show you why your application failed).  The
> HistoryServer would still give you enough information after the fact to see
> the failures.
>
> Generally, I submit my jobs in yarn-client mode while i'm testing so that
> I can spot errors right away. I generally only use yarn-cluster mode for
> jobs that are deployed onto operational hardware- that way if a job does
> fail, I can still use "yarn log" to find out why, but I don't need a local
> process running on the machine that submitted the job taking up resources
> (see the waitForAppCompletion property introduced into Spark 1.4).
>
> I'll also caveat my response and say that I have not used Spark's Python
> API so I can only give you a general overview of how the Yarn integration
> works from the Scala point of view.
>
>
> Hope this helps.
>
>
>> Yarn related logs can be found in RM ,NM, DN, NN log files in detail.
>>
>> Thanks again.
>>
>> On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> Elkhan,
>>>
>>> What does the ResourceManager say about the final status of the job?
>>> Spark jobs that run as Yarn applications can fail but still successfully
>>> clean up their resources and give them back to the Yarn cluster. Because of
>>> this, there's a difference between your code throwing an exception in an
>>> executor/driver and the Yarn application failing. Generally you'll see a
>>> yarn application fail when there's a memory problem (too much memory being
>>> allocated or not enough causing executors to fail multiple times not
>>> allowing your job to finish).
>>>
>>> What I'm seeing from your post is that you had an exception in your
>>> application which was caught by the Spark framework which then proceeded to
>>> clean up the job and shut itself down- which it did successfully. When you
>>> aren't running in the Yarn modes, you aren't seeing any Yarn status that's
>>> telling you the Yarn application was successfully shut down, you are just
>>> seeing the failure(s) from your drivers/executors.
>>>
>>>
>>>
>>> On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov <elkhan8...@gmail.com>
>>> wrote:
>>>
>>>> Any updates on this bug ?
>>>>
>>>> Why Spark log results & Job final status does not match ? (one saying
>>>> that job has failed, another stating that job has succeeded)
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov <elkhan8...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> While running Spark Word count python example with intentional mistake
>>>>> in *Yarn cluster mode*, Spark terminal states final status as
>>>>> SUCCEEDED, but log files state correct results indicating that the job
>>>>> failed.
>>>>>
>>>>> Why terminal log output & application log output contradict each other
>>>>> ?
>>>>>
>>>>> If i run same job on *local mode* then terminal logs and application
>>>>> logs match, where both state that job has failed to expected error in
>>>>> python script.
>>>>>
>>>>> More details: Scenario
>>>>>
>>>>> While running Spark Word count python example on *Yarn cluster mode*,
>>>>> if I make intentional error in wordcount.py by changing this line (I'm
>>>>> using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0
>>>>> versions - which i tested):
>>>>>
>>>>> lines = sc.textFile(sys.argv[1], 1)
>>>>>
>>>>> into this line:
>>>>>
>>>>> lines = sc.textFile(*nonExistentVariable*,1)
>>>>>
>>>>> where nonExistentVariable variable was never created and initialized.
>>>>>
>>>>> then i run that example with this command (I put README.md into HDFS
>>>>> before running this command):
>>>>>
>>>>> *./bin/spark-submit --master yarn-cluster wordcount.py /README.md*
>>>>>
>>>>> The job runs and finishes successfully according the log printed in
>>>>> the terminal :
>>>>> *Terminal logs*:
>>>>> ...
>>>>> 15/07/23 16:19:17 INFO yarn.Client: Application report for
>>>>> application_1437612288327_0013 (state: RUNNING)
>>>>> 15/07/23 16:19:18 INFO yarn.Client: Application report for
>>>>> application_1437612288327_0013 (state: RUNNING)
>>>>> 15/07/23 16:19:19 INFO yarn.Client: Application report for
>>>>> application_1437612288327_0013 (state: RUNNING)
>>>>> 15/07/23 16:19:20 INFO yarn.Client: Application report for
>>>>> application_1437612288327_0013 (state: RUNNING)
>>>>> 15/07/23 16:19:21 INFO yarn.Client: Application report for
>>>>> application_1437612288327_0013 (state: FINISHED)
>>>>> 15/07/23 16:19:21 INFO yarn.Client:
>>>>>  client token: N/A
>>>>>  diagnostics: Shutdown hook called before final status was reported.
>>>>>  ApplicationMaster host: 10.0.53.59
>>>>>  ApplicationMaster RPC port: 0
>>>>>  queue: default
>>>>>  start time: 1437693551439
>>>>>  final status: *SUCCEEDED*
>>>>>  tracking URL:
>>>>> http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1
>>>>>  user: edadashov
>>>>> 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called
>>>>> 15/07/23 16:19:21 INFO util.Utils: Deleting directory
>>>>> /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444
>>>>>
>>>>> But if look at log files generated for this application in HDFS - it
>>>>> indicates failure of the job with correct reason:
>>>>> *Application log files*:
>>>>> ...
>>>>> \00 stdout\00 179Traceback (most recent call last):
>>>>>   File "wordcount.py", line 32, in <module>
>>>>>     lines = sc.textFile(nonExistentVariable,1)
>>>>> *NameError: name 'nonExistentVariable' is not defined*
>>>>>
>>>>>
>>>>> Why terminal output - final status: *SUCCEEDED , *is not matching
>>>>> application log results - failure of the job (NameError: name
>>>>> 'nonExistentVariable' is not defined) ?
>>>>>
>>>>> Is this bug ? Is there Jira ticket related to this issue ? (Is someone
>>>>> assigned to this issue ?)
>>>>>
>>>>> If i run this wordcount .py example (with mistake line) in local mode,
>>>>> then terminal log states that the job has failed in terminal logs too.
>>>>>
>>>>> *./bin/spark-submit wordcount.py /README.md*
>>>>>
>>>>> *Terminal logs*:
>>>>>
>>>>> ...
>>>>> 15/07/23 16:31:55 INFO scheduler.EventLoggingListener: Logging events
>>>>> to hdfs:///app-logs/local-1437694314943
>>>>> Traceback (most recent call last):
>>>>>   File "/home/edadashov/tools/myspark/spark/wordcount.py", line 32, in
>>>>> <module>
>>>>>     lines = sc.textFile(nonExistentVariable,1)
>>>>> NameError: name 'nonExistentVariable' is not defined
>>>>> 15/07/23 16:31:55 INFO spark.SparkContext: Invoking stop() from
>>>>> shutdown hook
>>>>>
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best regards,
>>>> Elkhan Dadashov
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Best regards,
>> Elkhan Dadashov
>>
>
>


-- 

Best regards,
Elkhan Dadashov

Reply via email to