This is an automated email from the ASF dual-hosted git repository. mattmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drat.git
The following commit(s) were added to refs/heads/master by this push: new 27bbcf6 - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away new c24cca7 Merge branch 'master' of github.com:apache/drat 27bbcf6 is described below commit 27bbcf68b909d6daadd027dcd7a46e88c9f7d09f Author: Chris Mattmann <chris.a.mattm...@jpl.nasa.gov> AuthorDate: Sun Aug 5 10:31:20 2018 -0700 - address issue where partitioner runs, but no mappers have executed yet, and stop from going to reducers right away --- .../src/main/java/backend/ProcessDratWrapper.java | 26 ++++++++++++++++++++++ .../test/java/backend/TestProcessDratWrapper.java | 20 +++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/proteus/src/main/java/backend/ProcessDratWrapper.java b/proteus/src/main/java/backend/ProcessDratWrapper.java index 420036c..a068685 100644 --- a/proteus/src/main/java/backend/ProcessDratWrapper.java +++ b/proteus/src/main/java/backend/ProcessDratWrapper.java @@ -343,7 +343,17 @@ public class ProcessDratWrapper extends GenericProcess @VisibleForTesting protected boolean stillRunning(List<WorkflowInstance> instances) { + List<WorkflowInstance> partitionInstances = filterPartitioners(instances); List<WorkflowInstance> mapperInstances = filterMappers(instances); + LOG.info("Checking partitioners: inspecting ["+String.valueOf(partitionInstances + .size()) + "] partitioners."); + for (WorkflowInstance partitionInstance: partitionInstances) { + if (isRunning(partitionInstance.getState().getName())) { + LOG.info("Partitioner: [" + partitionInstance.getId() + "] still running."); + return true; + } + } + LOG.info("Checking mappers: inspecting [" + String.valueOf(mapperInstances.size()) + "] mappers."); for (WorkflowInstance mapperInstance : mapperInstances) { @@ -354,6 +364,22 @@ public class ProcessDratWrapper extends GenericProcess } return false; } + + @VisibleForTesting + protected List<WorkflowInstance> filterPartitioners(List<WorkflowInstance> instances){ + List<WorkflowInstance> partitioners = new ArrayList<>(); + if(instances!=null && instances.size()>0){ + for(WorkflowInstance instance:instances){ + if (instance.getCurrentTask().getTaskId().equals(PARTITION_AND_MAP_TASK_ID)) { + LOG.info("Adding partition/map: ["+instance.getCurrentTask().getTaskId()+"]"); + partitioners.add(instance); + }else{ + LOG.info("Filtering task: [" + instance.getCurrentTask().getTaskId() + "]"); + } + } + } + return partitioners; + } @VisibleForTesting protected List<WorkflowInstance> filterMappers(List<WorkflowInstance> instances){ diff --git a/proteus/src/test/java/backend/TestProcessDratWrapper.java b/proteus/src/test/java/backend/TestProcessDratWrapper.java index dbe768d..181837d 100644 --- a/proteus/src/test/java/backend/TestProcessDratWrapper.java +++ b/proteus/src/test/java/backend/TestProcessDratWrapper.java @@ -54,6 +54,26 @@ public class TestProcessDratWrapper extends TestCase { } assertTrue(wrapper.stillRunning(insts)); } + + public void testFilterPartitioners(){ + ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance(); + assertNotNull(wrapper); + String cmdLines = "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=FINISHED, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + + "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:MimePartitioner, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]\n" + + "Instance: [id=d3aed64f-6e7c-11e7-af03-cb83c51de744, status=PGE EXEC, currentTask=urn:drat:RatCodeAudit, workflow=Dynamic Workflow-6fc5fc4c-d27a-47f6-905c-2f2e99fa92e9,wallClockTime=0.13265,currentTaskWallClockTime=0.0]"; + + List<WorkflowItem> items = null; + items = wrapper.parseWorkflows(cmdLines); + assertNotNull(items); + List<WorkflowInstance> insts = new ArrayList<WorkflowInstance>(items.size()); + for(WorkflowItem wi: items) { + insts.add(wi.toInstance()); + } + List<WorkflowInstance> partitioners = null; + partitioners = wrapper.filterPartitioners(insts); + assertNotNull(partitioners); + assertEquals(2, partitioners.size()); + } public void testFilterMappers(){ ProcessDratWrapper wrapper = ProcessDratWrapper.getInstance();