On Feb 25, 2014, at 3:09 PM, Vladimir Blagojevic <vblag...@redhat.com> wrote:
> On 2/25/2014, 7:33 AM, Dan Berindei wrote: >> >> >> Do we really need special support for distributed tasks to write results to >> another cache? We already allow a task to do >> >> cache.getCacheManager().getCache("outputCache").put(k, v) > Yeah, very good point Dan. Thanks for being sanity check. Mircea? +1 >> >> >> > >> >> I was thinking we add a concept of >> >> DistributedTaskExecutionListener which can be specified in >> >> DistributedTaskBuilder: >> >> >> >> DistributedTaskBuilder<T> >> >> executionListener(DistributedTaskExecutionListener<K, T> listener); >> >> >> >> >> >> We needed DistributedTaskExecutionListener anyway. All distributed tasks >> >> might use some feedback about task progress, completion/failure and on. >> >> My proposal is roughly: >> >> >> >> >> >> public interface DistributedTaskExecutionListener<K, T> { >> >> >> >> void subtaskSent(Address node, Set<K> inputKeys); >> >> void subtaskFailed(Address node, Set<K> inputKeys, Exception e); >> >> void subtaskSucceded(Address node, Set<K> inputKeys, T result); >> >> void allSubtasksCompleted(); >> >> >> >> } >> >> >> >> So much for that. >> > I think this it would make sense to add this logic for monitoring, + >> > additional info such as average execution time etc. I'm not sure if this >> > is a generally useful API though, unless there were people asking for it >> > already? >> Ok, noted. If you remember any references about this let me know and >> I'll incorporate what people actually asked for rather than guess. >> >> Ok, let's wait until we get some actual requests from users then. TBH I >> don't think distributed tasks with subtasks are something that users care >> about. E.g. with Map/Reduce the reduce tasks are not subtasks of the >> map/combine tasks, so this API wouldn't help. >> >> Hadoop has a Reporter interface that allows you to report "ticks" and >> increment counters, maybe we should add something like that instead? > > The subtask I am referring to here is just to denote part of the distributed > task initiated using dist.executors. This interface (maybe extended a bit > with ideas from Reporter) could be used for both monitoring and more > application specific logic about task re-execution and so on. > > >> >> >> I think we should allow each distributed task to deal with output in its own >> way, the existing API should be enough. > > Yes, I can see your point. Mircea? +1 user driven features >> >> >> >> public interface MapReduceTaskExecutionListener { >> >> >> >> void mapTaskInitialized(Address executionAddress); >> >> void mapTaskSucceeded(Address executionAddress); >> >> void mapTaskFailed(Address executionTarget, Exception cause); >> >> void mapPhaseCompleted(); >> >> >> >> void reduceTaskInitialized(Address executionAddress); >> >> void reduceTaskSucceeded(Address executionAddress); >> >> void reduceTaskFailed(Address address, Exception cause); >> >> void reducePhaseCompleted(); >> >> >> >> } >> > IMO - in the first stage at leas - I would rather use a simpler >> > (Notifying)Future, on which the user can wait till the computation >> > happens: it's simpler and more aligned with the rest of our async API. >> > >> What do you mean? We already have futures in MapReduceTask API. This API >> is more fine grained and allows monitoring/reporting of task progress. >> Please clarify. ah right, wasn't aware of MapReduceTask.executeAsynchronously() :-) That's what I was after. >> >> I'm not sure about the usefulness of an API like this either... if the >> intention is to allow the user to collect statistics about duration of >> various phases, then I think exposing the durations via MapReduceTasks would >> be better. > How would you design that API Dan? Something other than listener/callback > interface? Functionally, what I was having in mind was JMX stats for the MapReduce tasks in general: like average execution time, count etc. Also the ability to cancel a running task through JMX/JON would be nice. I don't think we need to expose this to the user through the MapReduceTaskExecutionListener above, though. > >> >> >> >> while MapReduceTask would have an additional method: >> >> >> >> public void execute(Cache<KOut, VOut> resultsCache); >> > you could overload it with cache name only method. >> Yeah, good idea. Same for usingIntermediateCache? I actually asked you >> this here https://issues.jboss.org/browse/ISPN-4021 >> >> +1 to allow a cache name only. For the intermediate cache I don't think it >> makes sense to allow a Cache version at all. > Ok good. Deal. > > > Thanks, > Vladimir > _______________________________________________ > infinispan-dev mailing list > infinispan-dev@lists.jboss.org > https://lists.jboss.org/mailman/listinfo/infinispan-dev Cheers, -- Mircea Markus Infinispan lead (www.infinispan.org) _______________________________________________ infinispan-dev mailing list infinispan-dev@lists.jboss.org https://lists.jboss.org/mailman/listinfo/infinispan-dev