unsubscribe

2023-10-18 Thread ankur



unsubscribe

2023-09-13 Thread ankur



Unsubscribe

2023-03-26 Thread ankur
unsubscribe


Re: [DISCUSS] Enable blacklisting feature by default in 3.0

2019-04-02 Thread Ankur Gupta
Hi Steve,

Thanks for your feedback. From your email, I could gather the following two
important points:

   1. Report failures to something (cluster manager) which can opt to
   destroy the node and request a new one
   2. Pluggable failure detection algorithms

Regarding #1, current blacklisting implementation does report blacklist
status to Yarn here
<https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala#L126>,
which can choose to take appropriate action based on failures across
different applications (though it seems it doesn't currently). This doesn't
work in static allocation though and for other cluster managers. Those
issues are still open:

   - https://issues.apache.org/jira/browse/SPARK-24016
   - https://issues.apache.org/jira/browse/SPARK-19755
   - https://issues.apache.org/jira/browse/SPARK-23485

Regarding #2, that is a good point but I think that is optional and may not
be tied to enabling the blacklisting feature in the current form.

Coming back to the concerns raised by Reynold, Chris and Steve, it seems
that there are at least two tasks that we need to complete before we decide
to enable blacklisting by default in it's current form:

   1. Avoid resource starvation because of blacklisting
   2. Use exponential backoff for blacklisting instead of a configurable
   threshold
   3. Report blacklisting status to all cluster managers (I am not sure if
   this is necessary to move forward though)

Thanks for all the feedback. Please let me know if there are other concerns
that we would like to resolve before enabling blacklisting.

Thanks,
Ankur

On Tue, Apr 2, 2019 at 2:45 AM Steve Loughran 
wrote:

>
>
> On Fri, Mar 29, 2019 at 6:18 PM Reynold Xin  wrote:
>
>> We tried enabling blacklisting for some customers and in the cloud, very
>> quickly they end up having 0 executors due to various transient errors. So
>> unfortunately I think the current implementation is terrible for cloud
>> deployments, and shouldn't be on by default. The heart of the issue is that
>> the current implementation is not great at dealing with transient errors vs
>> catastrophic errors.
>>
>
> +1.
>
> It contains the assumption that "Blacklisting is the solution", when
> really "reporting to something which can opt to destroy the node and
> request a new one" is better
>
> Having some way to report those failures to a monitor like that can
> combine app-level failure detection with cloud-infra reaction.
>
> it's also interesting to look at more complex failure evalators, where
> the Φ Accrual Failure Detector is an interesting option
>
>
> http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.80.7427=rep1=pdf
>
> Apparently you can use this with Akka:
> https://manuel.bernhardt.io/2017/07/26/a-new-adaptive-accrual-failure-detector-for-akka/
>
> again, making this something where people can experiment with algorithms
> is a nice way to let interested parties explore the options in different
> environments
>
>


Re: [DISCUSS] Enable blacklisting feature by default in 3.0

2019-04-01 Thread Ankur Gupta
Thanks for your thoughts Chris! Please find my response below:

- Rather than a fixed timeout, could we do some sort of exponential
backoff? Start with a 10 or 20 second blacklist and increase from there?
The nodes with catastrophic errors should quickly hit long blacklist
intervals.
- +1 I like this idea. This will have some additional costs with respect to
tracking interval for each executor/node but it will certainly be very
useful.

- Correct me if I'm wrong, but once a task fails on an executor, even if
maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count
against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask"
only adds to the executor failures. If the tasks recovers on the second
attempt on the same executor, there is no way to remove the failure. I'd
argue that if the task succeeds on a second attempt on the same executor,
then it is definitely transient and the first attempt's failure should not
count towards the executor's total stage/application failure count.
- I am not sure about this. I think the purpose of blacklisting is to find
nodes with transient failures as well and blacklist them for a short period
of time to avoid re-computation. So, it will be useful to count a failure
against an executor even if it successfully recovered from that failure
later on. And with the exponential backoff, blacklisting will be transient
in nature so it will not be a huge penalty, if that failure was truly
transient.

- W.r.t turning it on by default: Do we have a sense of how many teams are
using blacklisting today using the current default settings? It may be
worth changing the defaults for a release or two and gather feedback to
help make a call on turning it on by default. We could potentially get that
feedback now: two question survey "Have you enabled blacklisting?" and
"What settings did you use?"
- I think this email was intended for that purpose. Additionally, from the
comments on my PR: https://github.com/apache/spark/pull/24208, it seems
some teams have that enabled by default already.

On Mon, Apr 1, 2019 at 3:08 PM Chris Stevens 
wrote:

> Hey Ankur,
>
> I think the significant decrease in "spark.blacklist.timeout" (1 hr down
> to 5 minutes) in your updated suggestion is the key here.
>
> Looking at a few *successful* runs of the application I was debugging,
> here are the error rates when I did *not* have blacklisting enabled:
>
> Run A: 8 executors with 36 total errors over the last 25 minutes of a 1
> hour and 6 minute run.
> Run B: 8 executors with 50 total errors over the last 30 minutes of a 1
> hour run.
>
> Increasing "spark.blacklist.application.maxFailedTasksPerExecutor" to 5
> would have allowed run A (~3 failures/executor) to pass, but run B (~6
> failures/executor) would not have without the change to
> "spark.blacklist.timeout".
>
> With such a small timeout of 5 minutes, the worst you get is executors
> flipping between blacklisted and not blacklisted (e.g. fail 5 tasks quickly
> due to disk failures, wait 5 minutes, fail 5 tasks quickly, wait 5
> minutes). For catastrophic errors, this is probably OK. The executor will
> fail fast each time it comes back online and will effectively be
> blacklisted 90+% of the time. For transient errors, the executor will come
> back online and probably be fine. The only trouble you get into is if you
> run out of executors for a stage due to a high amount of transient errors,
> but you're right, perhaps that many transient errors is something worth
> failing for.
>
> In the case I was debugging with fetch failures, only the 5 minute timeout
> applies, but I don't think it would have mattered. Fetch task attempts were
> "hanging" for 30+ minutes without failing (it took that long for the netty
> channel to timeout). As such, there was no opportunity to blacklist. Even
> reducing the number of fetch retry attempts didn't help, as the first
> attempt occasionally stalled due to the underlying networking issues.
>
> A few thoughts:
> - Correct me if I'm wrong, but once a task fails on an executor, even if
> maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count
> against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask"
> only adds to the executor failures. If the tasks recovers on the second
> attempt on the same executor, there is no way to remove the failure. I'd
> argue that if the task succeeds on a second attempt on the same executor,
> then it is definitely transient and the first attempt's failure should not
> count towards the executor's total stage/application failure count.
> - Rather than a fixed timeout, could we do some sort of exponential
> backoff? Start with a 10 or 20 second blacklist and increase from there?
> The nodes with cat

Re: [DISCUSS] Enable blacklisting feature by default in 3.0

2019-04-01 Thread Ankur Gupta
Hi Chris,

Thanks for sending over the example. As far as I can understand, it seems
that this would not have been a problem if
"spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher
threshold, as mentioned in my previous email.

Though, with 8/7 executors and 2 failedTasksPerExecutor, if the application
runs out of executors, that would imply at least 14 task failures in a
short period of time. So, I am not sure if the application should still
continue to run or fail. If this was not a transient issue, maybe failing
was the correct outcome, as it saves lot of unnecessary computation and
also alerts admins to look for transient/permanent hardware failures.

Please let me know if you think, we should enable blacklisting feature by
default with the higher threshold.

Thanks,
Ankur

On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens 
wrote:

> Hey All,
>
> My initial reply got lost, because I wasn't on the dev list. Hopefully
> this goes through.
>
> Back story for my experiments: customer was hitting network errors due to
> cloud infrastructure problems. Basically, executor X couldn't fetch from Y.
> The NIC backing the VM for executor Y was swallowing packets. I wanted to
> blacklist node Y.
>
> What I learned:
>
> 1. `spark.blacklist.application.fetchFailure.enabled` requires
> `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't
> created
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948>
>  without
> the latter). This was a problem because the defaults for
> `spark.blacklist.[task|stage|application].*` are aggressive and don't even
> apply to fetch failures. Those are always treated as non-transient. It
> would be nice to have fetch blacklisting without regular blacklisting.
>
> 2. Due to the conf coupling in #1 and transient cloud storage errors in
> the job (FileScanRDD was failing due to corrupted files), I had to set the
> `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000).
> Without these high settings, the customer was running out of nodes on the
> cluster (as we don't have blacklisting enabled by default, we haven't
> hooked it up to any sort of dynamic cloud VM re-provisioning - something
> like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure
> hit over multiple stages, so even though executors were aggressively
> removed within one
> stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was
> reached. The stages were succeeding because the FileScanRDD attempts on
> other executors succeeded. As such, the 8 node cluster ran out of executors
> after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`.
> If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode`
> would have kicked in and the job might have failed after 4-6 stages,
> depending on how it played out. (FWIW, this was running one executor per
> node).
>
> -Chris
>
> On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta 
> wrote:
>
>> Thanks Reynold! That is certainly useful to know.
>>
>> @Chris Will it be possible for you to send out those details if you still
>> have them or better create a JIRA, so someone can work on those
>> improvements. If there is already a JIRA, can you please provide a link to
>> the same.
>>
>> Additionally, if the concern is with the aggressiveness of the
>> blacklisting, then we can enable blacklisting feature by default with
>> higher thresholds for failures. Below is an alternate set of defaults that
>> were also proposed in the design document for max cluster utilization:
>>
>>1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
>>2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5
>>4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4
>>5. spark.blacklist.application.maxFailedTasksPerExecutor = 5
>>6. spark.blacklist.application.maxFailedExecutorsPerNode = 4
>>7. spark.blacklist.timeout = 5 mins
>>
>>
>>
>> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin  wrote:
>>
>>> We tried enabling blacklisting for some customers and in the cloud, very
>>> quickly they end up having 0 executors due to various transient errors. So
>>> unfortunately I think the current implementation is terrible for cloud
>>> deployments, and shouldn't be on by default. The heart of the issue is that
>>> the current implementation is not great at dealing with transient errors vs
>>> catastrophic errors.
>>>
>>> +Chris who was involved with those tests.
>>>
>>>
>>>
>>> On Thu, Mar 

Re: [DISCUSS] Enable blacklisting feature by default in 3.0

2019-03-29 Thread Ankur Gupta
Thanks Reynold! That is certainly useful to know.

@Chris Will it be possible for you to send out those details if you still
have them or better create a JIRA, so someone can work on those
improvements. If there is already a JIRA, can you please provide a link to
the same.

Additionally, if the concern is with the aggressiveness of the
blacklisting, then we can enable blacklisting feature by default with
higher thresholds for failures. Below is an alternate set of defaults that
were also proposed in the design document for max cluster utilization:

   1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
   2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
   3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5
   4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4
   5. spark.blacklist.application.maxFailedTasksPerExecutor = 5
   6. spark.blacklist.application.maxFailedExecutorsPerNode = 4
   7. spark.blacklist.timeout = 5 mins



On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin  wrote:

> We tried enabling blacklisting for some customers and in the cloud, very
> quickly they end up having 0 executors due to various transient errors. So
> unfortunately I think the current implementation is terrible for cloud
> deployments, and shouldn't be on by default. The heart of the issue is that
> the current implementation is not great at dealing with transient errors vs
> catastrophic errors.
>
> +Chris who was involved with those tests.
>
>
>
> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta <
> ankur.gu...@cloudera.com.invalid> wrote:
>
>> Hi all,
>>
>> This is a follow-on to my PR: https://github.com/apache/spark/pull/24208,
>> where I aimed to enable blacklisting for fetch failure by default. From the
>> comments, there is interest in the community to enable overall blacklisting
>> feature by default. I have listed down 3 different things that we can do
>> and would like to gather feedback and see if anyone has objections with
>> regards to this. Otherwise, I will just create a PR for the same.
>>
>> 1. *Enable blacklisting feature by default*. The blacklisting feature
>> was added as part of SPARK-8425 and is available since 2.2.0. This feature
>> was deemed experimental and was disabled by default. The feature blacklists
>> an executor/node from running a particular task, any task in a particular
>> stage or all tasks in application based on number of failures. There are
>> various configurations available which control those thresholds.
>> Additionally, the executor/node is only blacklisted for a configurable time
>> period. The idea is to enable blacklisting feature with existing defaults,
>> which are following:
>>
>>1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
>>2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2
>>4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2
>>5. spark.blacklist.application.maxFailedTasksPerExecutor = 2
>>6. spark.blacklist.application.maxFailedExecutorsPerNode = 2
>>7. spark.blacklist.timeout = 1 hour
>>
>> 2. *Kill blacklisted executors/nodes by default*. This feature was added
>> as part of SPARK-16554 and is available since 2.2.0. This is a follow-on
>> feature to blacklisting, such that if an executor/node is blacklisted for
>> the application, then it also terminates all running tasks on that executor
>> for faster failure recovery.
>>
>> 3. *Remove legacy blacklisting timeout config*
>> : spark.scheduler.executorTaskBlacklistTime
>>
>> Thanks,
>> Ankur
>>
>
>


[DISCUSS] Enable blacklisting feature by default in 3.0

2019-03-28 Thread Ankur Gupta
Hi all,

This is a follow-on to my PR: https://github.com/apache/spark/pull/24208,
where I aimed to enable blacklisting for fetch failure by default. From the
comments, there is interest in the community to enable overall blacklisting
feature by default. I have listed down 3 different things that we can do
and would like to gather feedback and see if anyone has objections with
regards to this. Otherwise, I will just create a PR for the same.

1. *Enable blacklisting feature by default*. The blacklisting feature was
added as part of SPARK-8425 and is available since 2.2.0. This feature was
deemed experimental and was disabled by default. The feature blacklists an
executor/node from running a particular task, any task in a particular
stage or all tasks in application based on number of failures. There are
various configurations available which control those thresholds.
Additionally, the executor/node is only blacklisted for a configurable time
period. The idea is to enable blacklisting feature with existing defaults,
which are following:

   1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
   2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
   3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2
   4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2
   5. spark.blacklist.application.maxFailedTasksPerExecutor = 2
   6. spark.blacklist.application.maxFailedExecutorsPerNode = 2
   7. spark.blacklist.timeout = 1 hour

2. *Kill blacklisted executors/nodes by default*. This feature was added as
part of SPARK-16554 and is available since 2.2.0. This is a follow-on
feature to blacklisting, such that if an executor/node is blacklisted for
the application, then it also terminates all running tasks on that executor
for faster failure recovery.

3. *Remove legacy blacklisting timeout config*
: spark.scheduler.executorTaskBlacklistTime

Thanks,
Ankur


Re: Persisting driver logs in yarn client mode (SPARK-25118)

2018-08-27 Thread Ankur Gupta
Thanks all for your responses.

So I believe a solution that accomplishes the following will be a good
solution:

1. Writes logs to Hdfs asynchronously
2. Writes logs at INFO level while ensuring that console logs are written
at WARN level by default (in shell mode)
3. Optionally, moves this file to Yarn's Remote Application Dir (to ensure
that shutdown operation does not slow down significantly)

If this resolves all the concerns, then I can work on a PR to add this
functionality.

On Fri, Aug 24, 2018 at 3:12 PM Marcelo Vanzin 
wrote:

> I think this would be useful, but I also share Saisai's and Marco's
> concern about the extra step when shutting down the application. If
> that could be minimized this would be a much more interesting feature.
>
> e.g. you could upload logs incrementally to HDFS, asynchronously,
> while the app is running. Or you could pipe them to the YARN AM over
> Spark's RPC (losing some logs in  the beginning and end of the driver
> execution). Or maybe something else.
>
> There is also the issue of shell logs being at "warn" level by
> default, so even if you write these to a file, they're not really that
> useful for debugging. So a solution than keeps that behavior, but
> writes INFO logs to this new sink, would be great.
>
> If you can come up with a solution to those problems I think this
> could be a good feature.
>
>
> On Wed, Aug 22, 2018 at 10:01 AM, Ankur Gupta
>  wrote:
> > Thanks for your responses Saisai and Marco.
> >
> > I agree that "rename" operation can be time-consuming on object storage,
> > which can potentially delay the shutdown.
> >
> > I also agree that customers/users have a way to use log appenders to
> write
> > log files and then send them along with Yarn application logs but I still
> > think it is a cumbersome process. Also, there is the issue that customers
> > cannot easily identify which logs belong to which application, without
> > reading the log file. And if users run multiple applications with default
> > log4j configurations on the same host, then they can end up writing to
> the
> > same log file.
> >
> > Because of the issues mentioned above, we can maybe think of this as an
> > optional feature, which will be disabled by default but turned on by
> > customers. This will solve the problems mentioned above, reduce the
> overhead
> > on users/customers while adding a bit of overhead during the shutdown
> phase
> > of Spark Application.
> >
> > Thanks,
> > Ankur
> >
> > On Wed, Aug 22, 2018 at 1:36 AM Marco Gaido 
> wrote:
> >>
> >> I agree with Saisai. You can also configure log4j to append anywhere
> else
> >> other than the console. Many companies have their system for collecting
> and
> >> monitoring logs and they just customize the log4j configuration. I am
> not
> >> sure how needed this change would be.
> >>
> >> Thanks,
> >> Marco
> >>
> >> Il giorno mer 22 ago 2018 alle ore 04:31 Saisai Shao
> >>  ha scritto:
> >>>
> >>> One issue I can think of is that this "moving the driver log" in the
> >>> application end is quite time-consuming, which will significantly
> delay the
> >>> shutdown. We already suffered such "rename" problem for event log on
> object
> >>> store, the moving of driver log will make the problem severe.
> >>>
> >>> For a vanilla Spark on yarn client application, I think user could
> >>> redirect the console outputs to log and provides both driver log and
> yarn
> >>> application log to the customers, this seems not a big overhead.
> >>>
> >>> Just my two cents.
> >>>
> >>> Thanks
> >>> Saisai
> >>>
> >>> Ankur Gupta  于2018年8月22日周三 上午5:19写道:
> >>>>
> >>>> Hi all,
> >>>>
> >>>> I want to highlight a problem that we face here at Cloudera and start
> a
> >>>> discussion on how to go about solving it.
> >>>>
> >>>> Problem Statement:
> >>>> Our customers reach out to us when they face problems in their Spark
> >>>> Applications. Those problems can be related to Spark, environment
> issues,
> >>>> their own code or something else altogether. A lot of times these
> customers
> >>>> run their Spark Applications in Yarn Client mode, which as we all
> know, uses
> >>>> a ConsoleAppender to print logs to the console. These customers
> usually send
> >>>> their Yarn logs

Re: Persisting driver logs in yarn client mode (SPARK-25118)

2018-08-22 Thread Ankur Gupta
Thanks for your responses Saisai and Marco.

I agree that "rename" operation can be time-consuming on object storage,
which can potentially delay the shutdown.

I also agree that customers/users have a way to use log appenders to write
log files and then send them along with Yarn application logs but I still
think it is a cumbersome process. Also, there is the issue that customers
cannot easily identify which logs belong to which application, without
reading the log file. And if users run multiple applications with default
log4j configurations on the same host, then they can end up writing to the
same log file.

Because of the issues mentioned above, we can maybe think of this as an
optional feature, which will be disabled by default but turned on by
customers. This will solve the problems mentioned above, reduce the
overhead on users/customers while adding a bit of overhead during the
shutdown phase of Spark Application.

Thanks,
Ankur

On Wed, Aug 22, 2018 at 1:36 AM Marco Gaido  wrote:

> I agree with Saisai. You can also configure log4j to append anywhere else
> other than the console. Many companies have their system for collecting and
> monitoring logs and they just customize the log4j configuration. I am not
> sure how needed this change would be.
>
> Thanks,
> Marco
>
> Il giorno mer 22 ago 2018 alle ore 04:31 Saisai Shao <
> sai.sai.s...@gmail.com> ha scritto:
>
>> One issue I can think of is that this "moving the driver log" in the
>> application end is quite time-consuming, which will significantly delay the
>> shutdown. We already suffered such "rename" problem for event log on object
>> store, the moving of driver log will make the problem severe.
>>
>> For a vanilla Spark on yarn client application, I think user could
>> redirect the console outputs to log and provides both driver log and yarn
>> application log to the customers, this seems not a big overhead.
>>
>> Just my two cents.
>>
>> Thanks
>> Saisai
>>
>> Ankur Gupta  于2018年8月22日周三 上午5:19写道:
>>
>>> Hi all,
>>>
>>> I want to highlight a problem that we face here at Cloudera and start a
>>> discussion on how to go about solving it.
>>>
>>> *Problem Statement:*
>>> Our customers reach out to us when they face problems in their Spark
>>> Applications. Those problems can be related to Spark, environment issues,
>>> their own code or something else altogether. A lot of times these customers
>>> run their Spark Applications in Yarn Client mode, which as we all know,
>>> uses a ConsoleAppender to print logs to the console. These customers
>>> usually send their Yarn logs to us to troubleshoot. As you may have
>>> figured, these logs do not contain driver logs and makes it difficult for
>>> us to troubleshoot the issue. In that scenario our customers end up running
>>> the application again, piping the output to a log file or using a local log
>>> appender and then sending over that file.
>>>
>>> I believe that there are other users in the community who also face
>>> similar problem, where the central team managing Spark clusters face
>>> difficulty in helping the end users because they ran their application in
>>> shell or yarn client mode (I am not sure what is the equivalent in Mesos).
>>>
>>> Additionally, there may be teams who want to capture all these logs so
>>> that they can be analyzed at some later point in time and the fact that
>>> driver logs are not a part of Yarn Logs causes them to capture only partial
>>> logs or makes it difficult to capture all the logs.
>>>
>>> *Proposed Solution:*
>>> One "low touch" approach will be to create an ApplicationListener which
>>> listens for Application Start and Application End events. On Application
>>> Start, this listener will append a Log Appender which writes to a local or
>>> remote (eg:hdfs) log file in an application specific directory and moves
>>> this to Yarn's Remote Application Dir (or equivalent Mesos Dir) on
>>> application end. This way the logs will be available as part of Yarn Logs.
>>>
>>> I am also interested in hearing about other ideas that the community may
>>> have about this. Or if someone has already solved this problem, then I
>>> would like them to contribute their solution to the community.
>>>
>>> Thanks,
>>> Ankur
>>>
>>


Persisting driver logs in yarn client mode (SPARK-25118)

2018-08-21 Thread Ankur Gupta
Hi all,

I want to highlight a problem that we face here at Cloudera and start a
discussion on how to go about solving it.

*Problem Statement:*
Our customers reach out to us when they face problems in their Spark
Applications. Those problems can be related to Spark, environment issues,
their own code or something else altogether. A lot of times these customers
run their Spark Applications in Yarn Client mode, which as we all know,
uses a ConsoleAppender to print logs to the console. These customers
usually send their Yarn logs to us to troubleshoot. As you may have
figured, these logs do not contain driver logs and makes it difficult for
us to troubleshoot the issue. In that scenario our customers end up running
the application again, piping the output to a log file or using a local log
appender and then sending over that file.

I believe that there are other users in the community who also face similar
problem, where the central team managing Spark clusters face difficulty in
helping the end users because they ran their application in shell or yarn
client mode (I am not sure what is the equivalent in Mesos).

Additionally, there may be teams who want to capture all these logs so that
they can be analyzed at some later point in time and the fact that driver
logs are not a part of Yarn Logs causes them to capture only partial logs
or makes it difficult to capture all the logs.

*Proposed Solution:*
One "low touch" approach will be to create an ApplicationListener which
listens for Application Start and Application End events. On Application
Start, this listener will append a Log Appender which writes to a local or
remote (eg:hdfs) log file in an application specific directory and moves
this to Yarn's Remote Application Dir (or equivalent Mesos Dir) on
application end. This way the logs will be available as part of Yarn Logs.

I am also interested in hearing about other ideas that the community may
have about this. Or if someone has already solved this problem, then I
would like them to contribute their solution to the community.

Thanks,
Ankur


Re: Spark GraphFrame ConnectedComponents

2017-01-05 Thread Ankur Srivastava
Adding DEV mailing list to see if this is a defect with ConnectedComponent
or if they can recommend any solution.

Thanks
Ankur

On Thu, Jan 5, 2017 at 1:10 PM, Ankur Srivastava <ankur.srivast...@gmail.com
> wrote:

> Yes I did try it out and it choses the local file system as my checkpoint
> location starts with s3n://
>
> I am not sure how can I make it load the S3FileSystem.
>
> On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung <felixcheun...@hotmail.com>
> wrote:
>
>> Right, I'd agree, it seems to be only with delete.
>>
>> Could you by chance run just the delete to see if it fails
>>
>> FileSystem.get(sc.hadoopConfiguration)
>> .delete(new Path(somepath), true)
>> --
>> *From:* Ankur Srivastava <ankur.srivast...@gmail.com>
>> *Sent:* Thursday, January 5, 2017 10:05:03 AM
>> *To:* Felix Cheung
>> *Cc:* u...@spark.apache.org
>>
>> *Subject:* Re: Spark GraphFrame ConnectedComponents
>>
>> Yes it works to read the vertices and edges data from S3 location and is
>> also able to write the checkpoint files to S3. It only fails when deleting
>> the data and that is because it tries to use the default file system. I
>> tried looking up how to update the default file system but could not find
>> anything in that regard.
>>
>> Thanks
>> Ankur
>>
>> On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung <felixcheun...@hotmail.com>
>> wrote:
>>
>>> From the stack it looks to be an error from the explicit call to
>>> hadoop.fs.FileSystem.
>>>
>>> Is the URL scheme for s3n registered?
>>> Does it work when you try to read from s3 from Spark?
>>>
>>> _
>>> From: Ankur Srivastava <ankur.srivast...@gmail.com>
>>> Sent: Wednesday, January 4, 2017 9:23 PM
>>> Subject: Re: Spark GraphFrame ConnectedComponents
>>> To: Felix Cheung <felixcheun...@hotmail.com>
>>> Cc: <u...@spark.apache.org>
>>>
>>>
>>>
>>> This is the exact trace from the driver logs
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Wrong
>>> FS: s3n:///8ac233e4-10f9-4eb3-aa53-df6d9d7ea7
>>> be/connected-components-c1dbc2b0/3, expected: file:///
>>> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalF
>>> ileSystem.java:80)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
>>> tus(RawLocalFileSystem.java:529)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
>>> ernal(RawLocalFileSystem.java:747)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>>> alFileSystem.java:524)
>>> at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileS
>>> ystem.java:534)
>>> at org.graphframes.lib.ConnectedComponents$.org$graphframes$lib
>>> $ConnectedComponents$$run(ConnectedComponents.scala:340)
>>> at org.graphframes.lib.ConnectedComponents.run(ConnectedCompone
>>> nts.scala:139)
>>> at GraphTest.main(GraphTest.java:31) --- Application Class
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:57)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>>> $SparkSubmit$$runMain(SparkSubmit.scala:731)
>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>>> .scala:181)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>> And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10
>>>
>>> Thanks
>>> Ankur
>>>
>>> On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava <
>>> ankur.srivast...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I am rerunning the pipeline to generate the exact trace, I have below
>>>> part of trace from last run:
>>>>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Wrong
>>>> FS: s3n://, expected: file:///
>>>> at org.apache.hadoop.f

Re: JavaRDD using Reflection

2015-09-14 Thread Ankur Srivastava
It is not reflection that is the issue here but use of an RDD
transformation "featureKeyClassPair.map" inside "lines.mapToPair".

>From the code snippet you have sent it is not very clear if
getFeatureScore(id,data)
invokes executeFeedFeatures, but if that is the case it is not very obvious
that “data” is a supposed to be huge and thus need to be  PairRDD and if it
is not you do not need to use the JavaPairRDD<String, String>, instead use
a Map<String, String> and return a List.

If it data is huge and has to be PairRDD pull out the logic to build the
data PairRDD and then invoke map function on that RDD.

- Ankur

On Mon, Sep 14, 2015 at 12:43 PM, <rachana.srivast...@thomsonreuters.com>
wrote:

> Thanks so much Ajay and Ankur for your input.
>
>
>
> What we are trying to do is following:  I am trying to invoke a class
> using Java reflection to get the result
>
>
>
> *THIS WORKS FINE*
>
> public static void main(String[] args) throws Exception {
>
> final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
>
> ...
>
> METHOD THAT I AM TRYING TO INVOKE USING REFLECTION
>
> JavaPairDStream<String, String> urlFeatureScore = lines.mapToPair( new
> PairFunction<String, String, String>() {
>
> public Tuple2<String, String> call(final String urlString) throws
> Exception {
>
> String  featureScore = getFeatureScore(id,data);
>
>   return new Tuple2<String, String>(urlString,  featureScore);
>
> }
>
>   });
>
> ...
>
> *REPLACED WITH METHOD INVOKED USING REFLECTION DOES NOT WORK ERROR MESSAGE
> BELOW.*
>
> >  executeFeedFactories2(featureClassName, featureParam, featureData)
>
> jssc.start();
>
> jssc.awaitTermination();
>
>   }
>
>
>
> *Splitting the same work to Class  using Reflection does not work:*
>
>
>
> private static  JavaRDD  executeFeedFactories2(String
> featureClassName, Map<String, String> featureParam,JavaPairRDD<String,
> String> featureData) throws Exception {
>
> Class featureClass = Class.forName(MyClass);
>
>Method m = featureClass.getMethod("executeFeedFeatures",
> Map.class, JavaPairRDD.class);
>
>JavaRDD  score = ( JavaRDD )
> m.invoke(featureClass.newInstance(), featureParam,featureData);
>
> return score;
>
> }
>
>
>
> public class MyClass{
>
> public static JavaRDD executeFeedFeatures(*Map* 
> featureParamMap,JavaPairRDD<String,
> String> featureKeyClassPair ){
>
> featureScoreRDD = featureKeyClassPair.map(new Function<Tuple2<String,
> String>, Double>() {
>
> public Double call(Tuple2<String, String> keyValue) {
>
> …
>
> }
>
> });
>
> return featureScoreRDD;
>
> }
>
>
>
> }
>
>
>
> Thanks again for all your help and advice.
>
>
>
> Regards,
>
>
>
> Rachana
>
>
>
> *From:* Ajay Singal [mailto:asinga...@gmail.com]
> *Sent:* Monday, September 14, 2015 12:20 PM
> *To:* Rachana Srivastava; Ankur Srivastava
> *Cc:* u...@spark.apache.org; dev@spark.apache.org; Ajay Singal
> *Subject:* Re: JavaRDD using Reflection
>
>
>
> Hello Rachana,
>
>
>
> The easiest way would be to start with creating a 'parent' JavaRDD and run
> different filters (based on different input arguments) to create respective
> 'child' JavaRDDs dynamically.
>
>
>
> Notice that the creation of these children RDDs is handled by the
> application driver.
>
>
>
> Hope this helps!
>
> Ajay
>
>
>
> On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
> Hi Rachana
>
>
>
> I didn't get you r question fully but as the error says you can not
> perform a rdd transformation or action inside another transformation. In
> your example you are performing an action "rdd2.values.count()" in side
> the "map" transformation. It is not allowed and in any case this will be
> very inefficient too.
>
>
>
> you should do something like this:
>
>
>
> final long rdd2_count = rdd2.values.count()
>
> rdd1.map(x => rdd2_count * x)
>
>
>
> Hope this helps!!
>
>
>
> - Ankur
>
>
>
> On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
> Hello all,
>
>
>
> I am working a problem that requires us to create different set of JavaRDD
> based on differen

Re: Two joins in GraphX Pregel implementation

2015-07-28 Thread Ankur Dave
On 27 Jul 2015, at 16:42, Ulanov, Alexander alexander.ula...@hp.com wrote:

 It seems that the mentioned two joins can be rewritten as one outer join


You're right. In fact, the outer join can be streamlined further using a
method from GraphOps:

g = g.joinVertices(messages)(vprog).cache()

Then, instead of passing newVerts as the active set for mapReduceTriplets,
we could pass `messages`.

If you're interested in proposing a PR for this, I've attached a patch with
these changes and updates to the comments.

On Tue, Jul 28, 2015 at 1:15 AM, Ulanov, Alexander alexander.ula...@hp.com
 wrote:

 I’ve found two PRs (almost identical) for replacing mapReduceTriplets with
 aggregateMessages

[...]
 Do you know the reason why this improvement is not pushed?


There isn't any performance benefit to switching Pregel to use
aggregateMessages while preserving its current interface, because the
interface uses Iterators and would require us to wrap and unwrap them
anyway. The semantics of aggregateMessagesWithActiveSet are otherwise the
same as mapReduceTriplets, so there isn't any functionality we are missing
out on. And this change seems too small to justify introducing a new
version of Pregel, though it would be worthwhile when combined with other
improvements https://github.com/apache/spark/pull/1217.

Ankur http://www.ankurdave.com/


pregel-simplify-join.patch
Description: Binary data

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Re: GraphX: New graph operator

2015-06-02 Thread Ankur Dave
I think it would be good to have more basic operators like union or
difference, as long as they have an efficient distributed implementation
and are plausibly useful.

If they can be written in terms of the existing GraphX API, it would be
best to put them into GraphOps to keep the core GraphX implementation
small. The `mask` operation should actually be in GraphOps -- it's only in
GraphImpl for historical reasons. On the other hand, `subgraph` needs to be
in GraphImpl for performance: it accesses EdgeRDDImpl#filter(epred, vpred),
which can't be a public EdgeRDD method because its semantics rely on an
implementation detail (vertex replication).

Ankur http://www.ankurdave.com/

On Mon, Jun 1, 2015 at 8:54 AM, Tarek Auel tarek.a...@gmail.com wrote:

 Hello,

 Someone proposed in a Jira issue to implement new graph operations. Sean
 Owen recommended to check first with the mailing list, if this is
 interesting or not.

 So I would like to know, if it is interesting for GraphX to implement the
 operators like:
 http://en.wikipedia.org/wiki/Graph_operations and/or
 http://techieme.in/complex-graph-operations/

 If yes, should they be integrated into GraphImpl (like mask, subgraph
 etc.) or as external library? My feeling is that they are similar to mask.
 Because of consistency they should be part of the graph implementation
 itself.

 What do you guys think? I really would like to bring GraphX forward and
 help to implement some of these.

 Looking forward to hear your opinions
 Tarek




Re: GraphX implementation of ALS?

2015-05-26 Thread Ankur Dave
This is the latest GraphX-based ALS implementation that I'm aware of:
https://github.com/ankurdave/spark/blob/GraphXALS/graphx/src/main/scala/org/apache/spark/graphx/lib/ALS.scala

When I benchmarked it last year, it was about twice as slow as MLlib's ALS,
and I think the latter has gotten faster since then. The performance gap is
because the MLlib version implements some ALS-specific optimizations that
are hard to do using GraphX, such as storing the edges twice (partitioned
by source and by destination) to reduce communication.

Ankur http://www.ankurdave.com/

On Tue, May 26, 2015 at 3:36 PM, Ben Mabey b...@benmabey.com wrote:

 I've heard in a number of presentations Spark's ALS implementation was
 going to be moved over to a GraphX version. For example, this
 presentation on GraphX
 https://databricks-training.s3.amazonaws.com/slides/graphx@sparksummit_2014-07.pdf(slide
 #23) at the Spark Summit mentioned a 40 LOC version using the Pregel API.
 Looking at the ALS source on master
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
 it looks like the original implementation is still being used and no use of
 GraphX can be seen. Other algorithms mentioned in the GraphX presentation
 can be found in the repo
 https://github.com/apache/spark/tree/master/graphx/src/main/scala/org/apache/spark/graphx/lib
 already but I don't see ALS. Could someone link me to the GraphX version
 for comparison purposes?  Also, could someone comment on why the the newer
 version isn't in use yet (i.e. are there tradeoffs with using the GraphX
 version that makes it less desirable)?



Re: GraphX vertex partition/location strategy

2015-01-19 Thread Ankur Dave
No - the vertices are hash-partitioned onto workers independently of the
edges. It would be nice for each vertex to be on the worker with the most
adjacent edges, but we haven't done this yet since it would add a lot of
complexity to avoid load imbalance while reducing the overall communication
by a small factor.

We refer to the number of partitions containing adjacent edges for a
particular vertex as the vertex's replication factor. I think the typical
replication factor for power-law graphs with 100-200 partitions is 10-15,
and placing the vertex at the ideal location would only reduce the
replication factor by 1.

Ankur http://www.ankurdave.com/

On Mon, Jan 19, 2015 at 12:20 PM, Michael Malak 
michaelma...@yahoo.com.invalid wrote:

 Does GraphX make an effort to co-locate vertices onto the same workers as
 the majority (or even some) of its edges?



Re: [VOTE] Designating maintainers for some Spark components

2014-11-06 Thread Ankur Dave
+1 (binding)

Ankur http://www.ankurdave.com/

On Wed, Nov 5, 2014 at 5:31 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 I'd like to formally call a [VOTE] on this model, to last 72 hours. The
 [VOTE] will end on Nov 8, 2014 at 6 PM PST.



Re: PARSING_ERROR from kryo

2014-09-15 Thread Ankur Dave
At 2014-09-15 08:59:48 -0700, Andrew Ash and...@andrewash.com wrote:
 I'm seeing the same exception now on the Spark 1.1.0 release.  Did you ever
 get this figured out?

 [...]

 On Thu, Aug 21, 2014 at 2:14 PM, npanj nitinp...@gmail.com wrote:
 I am getting PARSING_ERROR while running my job on the code checked out up
 to commit# db56f2df1b8027171da1b8d2571d1f2ef1e103b6.

The error is because I merged a GraphX PR that introduced a nondeterministic 
bug [1]. I reverted the faulty PR, but it was too late for the 1.1.0 release. 
The problem should go away if you use branch-1.1 or master. Sorry about that...

Ankur

[1] https://issues.apache.org/jira/browse/SPARK-3400

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Graphx seems to be broken while Creating a large graph(6B nodes in my case)

2014-08-25 Thread Ankur Dave
I posted the fix on the JIRA ticket 
(https://issues.apache.org/jira/browse/SPARK-3190). To update the user list, 
this is indeed an integer overflow problem when summing up the partition sizes. 
The fix is to use Longs for the sum: https://github.com/apache/spark/pull/2106.

Ankur


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: VertexPartition and ShippableVertexPartition

2014-07-28 Thread Ankur Dave
On Mon, Jul 28, 2014 at 4:29 AM, Larry Xiao xia...@sjtu.edu.cn wrote:

 On 7/28/14, 3:41 PM, shijiaxin wrote:

 There is a VertexPartition in the EdgePartition,which is created by

 EdgePartitionBuilder.toEdgePartition.

 and There is also a ShippableVertexPartition in the VertexRDD.

 These two Partitions have a lot of common things like index, data and

 Bitset, why is this necessary?



There is a VertexPartition in the EdgePartition,which is created by

Is the VertexPartition in the EdgePartition, the Mirror Cache part?


Yes, exactly. The primary copy of each vertex is stored in the VertexRDD
using the index, values, and mask data structures, which together form a
hash map. In addition, each partition of the VertexRDD stores the
corresponding partition of the routing table to facilitate joining with the
edges. The ShippableVertexPartition class encapsulates the vertex hash map
along with a RoutingTablePartition.

After joining the vertices with the edges, the edge partitions cache their
adjacent vertices in the mirror cache. They use the VertexPartition for
this, which provides only the hash map functionality and not the routing
table.

Ankur http://www.ankurdave.com/


Re: GraphX graph partitioning strategy

2014-07-25 Thread Ankur Dave
Hi Larry,

GraphX's graph constructor leaves the edges in their original partitions by
default. To support arbitrary multipass graph partitioning, one idea is to
take advantage of that by partitioning the graph externally to GraphX
(though possibly using information from GraphX such as the degrees), then
pass the partitioned edges to GraphX.

For example, if you had an edge partitioning function that needed the full
triplet to assign a partition, you could do this as follows:

val unpartitionedGraph: Graph[Int, Int] = ...val numPartitions: Int = 128
def getTripletPartition(e: EdgeTriplet[Int, Int]): PartitionID = ...
// Get the triplets using GraphX, then use Spark to repartition
themval partitionedEdges = unpartitionedGraph.triplets
  .map(e = (getTripletPartition(e), e))
  .partitionBy(new HashPartitioner(numPartitions))
val partitionedGraph = Graph(unpartitionedGraph.vertices, partitionedEdges)


A multipass partitioning algorithm could store its results in the edge
attribute, and then you could use the code above to do the partitioning.

Ankur http://www.ankurdave.com/


On Wed, Jul 23, 2014 at 11:59 PM, Larry Xiao xia...@sjtu.edu.cn wrote:

 Hi all,

 I'm implementing graph partitioning strategy for GraphX, learning from
 researches on graph computing.

 I have two questions:

 - a specific implement question:
 In current design, only vertex ID of src and dst are provided
 (PartitionStrategy.scala).
 And some strategies require knowledge about the graph (like degrees) and
 can consist more than one passes to finally produce the partition ID.
 So I'm changing the PartitionStrategy.getPartition API to provide more
 info, but I don't want to make it complex. (the current one looks very
 clean)

 - an open question:
 What advice would you give considering partitioning, considering the
 procedure Spark adopt on graph processing?

 Any advice is much appreciated.

 Best Regards,
 Larry Xiao

 Reference

 Bipartite-oriented Distributed Graph Partitioning for Big Learning.
 PowerLyra : Differentiated Graph Computation and Partitioning on Skewed
 Graphs



Re: GraphX graph partitioning strategy

2014-07-25 Thread Ankur Dave
Oops, the code should be:

val unpartitionedGraph: Graph[Int, Int] = ...val numPartitions: Int = 128
def getTripletPartition(e: EdgeTriplet[Int, Int]): PartitionID = ...
// Get the triplets using GraphX, then use Spark to repartition
themval partitionedEdges = unpartitionedGraph.triplets
  .map(e = (getTripletPartition(e), e))
  .partitionBy(new HashPartitioner(numPartitions))
  *.map(pair = Edge(pair._2.srcId, pair._2.dstId, pair._2.attr))*
val partitionedGraph = Graph(unpartitionedGraph.vertices, partitionedEdges)


Ankur http://www.ankurdave.com/


Re: GraphX can not unpersist edges of old graph?

2014-06-12 Thread Ankur Dave
We didn't provide an unpersist API for Graph because the internal
dependency structure of a graph can make it hard to unpersist correctly in
a way that avoids recomputation. However, you can directly unpersist a
graph's vertices and edges RDDs using graph.vertices.unpersist() and
graph.edges.unpersist().

By the way, the memory leak bug with Pregel (SPARK-2025
https://issues.apache.org/jira/browse/SPARK-2025) is fixed in master.

Ankur http://www.ankurdave.com/


Re: Removing spark-debugger.md file from master?

2014-06-03 Thread Ankur Dave
I agree, let's go ahead and remove it.

Ankur http://www.ankurdave.com/


Re: [VOTE] Release Apache Spark 1.0.0 (RC11)

2014-05-27 Thread Ankur Dave
0

OK, I withdraw my downvote.

Ankur http://www.ankurdave.com/


Re: Spark 1.0: outerJoinVertices seems to return null for vertex attributes when input was partitioned and vertex attribute type is changed

2014-05-26 Thread Ankur Dave
This is probably due to
SPARK-1931https://issues.apache.org/jira/browse/SPARK-1931,
which I just fixed in PR #885 https://github.com/apache/spark/pull/885.
Is the problem resolved if you use the current Spark master?

Ankur http://www.ankurdave.com/