On Feb 19, 2014, at 8:45 PM, Vladimir Blagojevic <vblag...@redhat.com> wrote:
> Hey guys, > > As some of you might know we have received additional requirements from > community and internally to add a few things to dist.executors and > map/reduce API. On distributed executors front we need to enable > distributed executors to store results into cache directly rather than > returning them to invoker [1]. As soon as we introduce this API we also > need a asyc. mechanism to allow notifications of subtask > completion/failure. I think we need both in at the same time :-) > I was thinking we add a concept of > DistributedTaskExecutionListener which can be specified in > DistributedTaskBuilder: > > DistributedTaskBuilder<T> > executionListener(DistributedTaskExecutionListener<K, T> listener); > > > We needed DistributedTaskExecutionListener anyway. All distributed tasks > might use some feedback about task progress, completion/failure and on. > My proposal is roughly: > > > public interface DistributedTaskExecutionListener<K, T> { > > void subtaskSent(Address node, Set<K> inputKeys); > void subtaskFailed(Address node, Set<K> inputKeys, Exception e); > void subtaskSucceded(Address node, Set<K> inputKeys, T result); > void allSubtasksCompleted(); > > } > > So much for that. I think this it would make sense to add this logic for monitoring, + additional info such as average execution time etc. I'm not sure if this is a generally useful API though, unless there were people asking for it already? > If tasks do not use input keys these parameters would > be emply sets. Now for [1] we need to add additional methods to > DistributedExecutorService. We can not specify result cache in > DistributedTaskBuilder as we are still bound to only submit methods in > DistributedExecutorService that return futures and we don't want that. > We need two new void methods: > > <T, K> void submitEverywhere(DistributedTask<T> task, > Cache<DistExecResultKey<K>, T> result); > <T, K > void submitEverywhere(DistributedTask<T> task, > Cache<DistExecResultKey<K>, T> result, K... input); > > > Now, why bother with DistExecResultKey? Well we have tasks that use > input keys and tasks that don't. So results cache could only be keyed by > either keys or execution address, or combination of those two. > Therefore, DistExecResultKey could be something like: > > public interface DistExecResultKey<K> { > > Address getExecutionAddress(); > K getKey(); > > } > > If you have a better idea how to address this aspect let us know. So > much for distributed executors. > > > For map/reduce we also have to enable storing of map reduce task results > into cache [2] and allow users to specify custom cache for intermediate > results[3]. Part of task [2] is to allow notification about map/reduce > task progress and completion. Just as in dist.executor I would add > MapReduceTaskExecutionListener interface: > > > public interface MapReduceTaskExecutionListener { > > void mapTaskInitialized(Address executionAddress); > void mapTaskSucceeded(Address executionAddress); > void mapTaskFailed(Address executionTarget, Exception cause); > void mapPhaseCompleted(); > > void reduceTaskInitialized(Address executionAddress); > void reduceTaskSucceeded(Address executionAddress); > void reduceTaskFailed(Address address, Exception cause); > void reducePhaseCompleted(); > > } IMO - in the first stage at leas - I would rather use a simpler (Notifying)Future, on which the user can wait till the computation happens: it's simpler and more aligned with the rest of our async API. > > while MapReduceTask would have an additional method: > > public void execute(Cache<KOut, VOut> resultsCache); you could overload it with cache name only method. > > MapReduceTaskExecutionListener could be specified using fluent > MapReduceTask API just as intermediate cache would be: > > public MapReduceTask<KIn, VIn, KOut, VOut> > usingIntermediateCache(Cache<KOut, List<VOut>> tmpCache); > > thus addressing issue [3] +1 > > Let me know what you think, > Vladimir > > > [1] https://issues.jboss.org/browse/ISPN-4030 > [2] https://issues.jboss.org/browse/ISPN-4002 > [3] https://issues.jboss.org/browse/ISPN-4021 > _______________________________________________ > infinispan-dev mailing list > infinispan-dev@lists.jboss.org > https://lists.jboss.org/mailman/listinfo/infinispan-dev Cheers, -- Mircea Markus Infinispan lead (www.infinispan.org) _______________________________________________ infinispan-dev mailing list infinispan-dev@lists.jboss.org https://lists.jboss.org/mailman/listinfo/infinispan-dev