http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java index bb0dbff..2d63541 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java @@ -41,7 +41,8 @@ public class ParentDeadException extends Exception { super(message, cause); } - public ParentDeadException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) { + public ParentDeadException(final String message, final Throwable cause, + final boolean enableSuppression, final boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); }
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java index 3272e57..71cf9b5 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java @@ -34,5 +34,6 @@ import org.apache.reef.wake.EventHandler; @DefaultImplementation(value = GroupCommNetworkHandlerImpl.class) public interface GroupCommNetworkHandler extends EventHandler<Message<GroupCommunicationMessage>> { - void register(Class<? extends Name<String>> groupName, EventHandler<GroupCommunicationMessage> commGroupNetworkHandler); + void register(Class<? extends Name<String>> groupName, + EventHandler<GroupCommunicationMessage> commGroupNetworkHandler); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java index 03c7ec8..29c5cef 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java @@ -117,7 +117,8 @@ public class GroupCommunicationMessage { @Override public String toString() { - return "[" + msgType + " from " + getSource() + " to " + getDestination() + " for " + simpleGroupName + ":" + simpleOperName + "]"; + return "[" + msgType + " from " + getSource() + " to " + getDestination() + " for " + simpleGroupName + ":" + + simpleOperName + "]"; } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java index ed64c7e..91d6564 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java @@ -113,7 +113,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { @Override public CommunicationGroupDriver addBroadcast(final Class<? extends Name<String>> operatorName, final BroadcastOperatorSpec spec) { - LOG.entering("CommunicationGroupDriverImpl", "addBroadcast", new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); + LOG.entering("CommunicationGroupDriverImpl", "addBroadcast", + new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); if (finalised) { throw new IllegalStateException("Can't add more operators to a finalised spec"); } @@ -122,14 +123,16 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { topology.setRootTask(spec.getSenderId()); topology.setOperatorSpecification(spec); topologies.put(operatorName, topology); - LOG.exiting("CommunicationGroupDriverImpl", "addBroadcast", Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"})); + LOG.exiting("CommunicationGroupDriverImpl", "addBroadcast", + Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"})); return this; } @Override public CommunicationGroupDriver addReduce(final Class<? extends Name<String>> operatorName, final ReduceOperatorSpec spec) { - LOG.entering("CommunicationGroupDriverImpl", "addReduce", new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); + LOG.entering("CommunicationGroupDriverImpl", "addReduce", + new Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec}); if (finalised) { throw new IllegalStateException("Can't add more operators to a finalised spec"); } @@ -139,13 +142,15 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { topology.setRootTask(spec.getReceiverId()); topology.setOperatorSpecification(spec); topologies.put(operatorName, topology); - LOG.exiting("CommunicationGroupDriverImpl", "addReduce", Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"})); + LOG.exiting("CommunicationGroupDriverImpl", "addReduce", + Arrays.toString(new Object[]{getQualifiedName(), Utils.simpleName(operatorName), " added"})); return this; } @Override public Configuration getTaskConfiguration(final Configuration taskConf) { - LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration", new Object[]{getQualifiedName(), confSerializer.toString(taskConf)}); + LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration", + new Object[]{getQualifiedName(), confSerializer.toString(taskConf)}); final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); final String taskId = taskId(taskConf); if (perTaskState.containsKey(taskId)) { @@ -182,7 +187,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { return null; } final Configuration configuration = jcb.build(); - LOG.exiting("CommunicationGroupDriverImpl", "getTaskConfiguration", Arrays.toString(new Object[]{getQualifiedName(), confSerializer.toString(configuration)})); + LOG.exiting("CommunicationGroupDriverImpl", "getTaskConfiguration", + Arrays.toString(new Object[]{getQualifiedName(), confSerializer.toString(configuration)})); return configuration; } @@ -192,14 +198,17 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { if (!taskState.equals(TaskState.NOT_STARTED)) { LOG.finest(getQualifiedName() + taskId + " has started."); if (taskState.equals(TaskState.RUNNING)) { - LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is running. We can't get config"})); + LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", + Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is running. We can't get config"})); return true; } else { - LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has failed. We can get config"})); + LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", + Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has failed. We can get config"})); return false; } } else { - LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has not started. We can get config"})); + LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", + Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has not started. We can get config"})); return false; } } @@ -211,7 +220,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { @Override public void addTask(final Configuration partialTaskConf) { - LOG.entering("CommunicationGroupDriverImpl", "addTask", new Object[]{getQualifiedName(), confSerializer.toString(partialTaskConf)}); + LOG.entering("CommunicationGroupDriverImpl", "addTask", + new Object[]{getQualifiedName(), confSerializer.toString(partialTaskConf)}); final String taskId = taskId(partialTaskConf); LOG.finest(getQualifiedName() + "AddTask(" + taskId + "). Waiting to acquire toBeRemovedLock"); synchronized (toBeRemovedLock) { @@ -236,7 +246,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { LOG.finest(getQualifiedName() + "Released topologiesLock"); } LOG.fine(getQualifiedName() + "Added " + taskId + " to topology"); - LOG.exiting("CommunicationGroupDriverImpl", "addTask", Arrays.toString(new Object[]{getQualifiedName(), "Added task: ", taskId})); + LOG.exiting("CommunicationGroupDriverImpl", "addTask", + Arrays.toString(new Object[]{getQualifiedName(), "Added task: ", taskId})); } public void removeTask(final String taskId) { @@ -261,7 +272,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { LOG.finest(getQualifiedName() + "Released toBeRemovedLock"); } LOG.fine(getQualifiedName() + "Removed " + taskId + " to topology"); - LOG.exiting("CommunicationGroupDriverImpl", "removeTask", Arrays.toString(new Object[]{getQualifiedName(), "Removed task: ", taskId})); + LOG.exiting("CommunicationGroupDriverImpl", "removeTask", + Arrays.toString(new Object[]{getQualifiedName(), "Removed task: ", taskId})); } public void runTask(final String id) { @@ -290,10 +302,12 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { LOG.finest(getQualifiedName() + "Released yetToRunLock"); } if (nonMember) { - LOG.exiting("CommunicationGroupDriverImpl", "runTask", getQualifiedName() + id + " does not belong to this communication group. Ignoring"); + LOG.exiting("CommunicationGroupDriverImpl", "runTask", + getQualifiedName() + id + " does not belong to this communication group. Ignoring"); } else { LOG.fine(getQualifiedName() + "Status of task " + id + " changed to RUNNING"); - LOG.exiting("CommunicationGroupDriverImpl", "runTask", Arrays.toString(new Object[]{getQualifiedName(), "Set running complete on task ", id})); + LOG.exiting("CommunicationGroupDriverImpl", "runTask", + Arrays.toString(new Object[]{getQualifiedName(), "Set running complete on task ", id})); } } @@ -342,7 +356,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { LOG.finest(getQualifiedName() + "Released configLock"); } LOG.fine(getQualifiedName() + "Status of task " + id + " changed to FAILED"); - LOG.exiting("CommunicationGroupDriverImpl", "failTask", Arrays.toString(new Object[]{getQualifiedName(), "Set failed complete on task ", id})); + LOG.exiting("CommunicationGroupDriverImpl", "failTask", + Arrays.toString(new Object[]{getQualifiedName(), "Set failed complete on task ", id})); } private boolean cantFailTask(final String taskId) { @@ -351,14 +366,18 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { if (!taskState.equals(TaskState.NOT_STARTED)) { LOG.finest(getQualifiedName() + taskId + " has started."); if (!taskState.equals(TaskState.RUNNING)) { - LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is not running yet. Can't set failure"})); + LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", + Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is not running yet. Can't set failure"})); return true; } else { - LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " is running. Can set failure"})); + LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", + Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " is running. Can set failure"})); return false; } } else { - LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " has not started. We can't fail a task that hasn't started"})); + LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", + Arrays.toString(new Object[]{true, getQualifiedName(), taskId, + " has not started. We can't fail a task that hasn't started"})); return true; } } @@ -369,8 +388,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { final Class<? extends Name<String>> operName = indMsg.getOperName(); final MsgKey key = new MsgKey(msg); if (msgQue.contains(key, indMsg)) { - throw new RuntimeException(getQualifiedName() + "MsgQue already contains " + msg.getType() + " msg for " + key + " in " - + Utils.simpleName(operName)); + throw new RuntimeException(getQualifiedName() + "MsgQue already contains " + msg.getType() + " msg for " + key + + " in " + Utils.simpleName(operName)); } LOG.finest(getQualifiedName() + "Adding msg to que"); msgQue.add(key, indMsg); @@ -382,7 +401,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { } LOG.finest(getQualifiedName() + "All msgs processed and removed"); } - LOG.exiting("CommunicationGroupDriverImpl", "queNProcessMsg", Arrays.toString(new Object[]{getQualifiedName(), "Que & Process done for: ", msg})); + LOG.exiting("CommunicationGroupDriverImpl", "queNProcessMsg", + Arrays.toString(new Object[]{getQualifiedName(), "Que & Process done for: ", msg})); } private boolean isMsgVersionOk(final GroupCommunicationMessage msg) { @@ -393,7 +413,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { final int expSrcVersion = topologies.get(Utils.getClass(msg.getOperatorname())).getNodeVersion(srcId); final boolean srcVersionChk = chkVersion(rcvSrcVersion, expSrcVersion, "Src Version Check: "); - LOG.exiting("CommunicationGroupDriverImpl", "isMsgVersionOk", Arrays.toString(new Object[]{srcVersionChk, getQualifiedName(), msg})); + LOG.exiting("CommunicationGroupDriverImpl", "isMsgVersionOk", + Arrays.toString(new Object[]{srcVersionChk, getQualifiedName(), msg})); return srcVersionChk; } else { throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs"); @@ -423,8 +444,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { return; } if (initializing.get() || msg.getType().equals(ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology)) { - LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for all required(" + allTasksAdded.getInitialCount() + - ") nodes to run"); + LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for all required(" + + allTasksAdded.getInitialCount() + ") nodes to run"); allTasksAdded.await(); LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All required(" + allTasksAdded.getInitialCount() + ") nodes are running"); @@ -433,7 +454,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { queNProcessMsg(msg); LOG.finest(getQualifiedName() + "Released topologiesLock"); } - LOG.exiting("CommunicationGroupDriverImpl", "processMsg", Arrays.toString(new Object[]{getQualifiedName(), "ProcessMsg done for: ", msg})); + LOG.exiting("CommunicationGroupDriverImpl", "processMsg", + Arrays.toString(new Object[]{getQualifiedName(), "ProcessMsg done for: ", msg})); } private String taskId(final Configuration partialTaskConf) { @@ -441,7 +463,8 @@ public class CommunicationGroupDriverImpl implements CommunicationGroupDriver { final Injector injector = Tang.Factory.getTang().newInjector(partialTaskConf); return injector.getNamedInstance(TaskConfigurationOptions.Identifier.class); } catch (final InjectionException e) { - throw new RuntimeException(getQualifiedName() + "Injection exception while extracting taskId from partialTaskConf", e); + throw new RuntimeException(getQualifiedName() + + "Injection exception while extracting taskId from partialTaskConf", e); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java index 497d032..e2e1093 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java @@ -71,7 +71,8 @@ public class FlatTopology implements Topology { private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap<>(); public FlatTopology(final EStage<GroupCommunicationMessage> senderStage, - final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName, + final Class<? extends Name<String>> groupName, + final Class<? extends Name<String>> operatorName, final String driverId, final int numberOfTasks) { this.senderStage = senderStage; this.groupName = groupName; @@ -279,7 +280,8 @@ public class FlatTopology implements Topology { } for (final TaskNode node : toBeUpdatedNodes) { node.updatingTopology(); - senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(), + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(), node.getVersion(), Utils.EMPTY_BYTE_ARR)); } nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes); @@ -297,7 +299,8 @@ public class FlatTopology implements Topology { } final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged); final Codec<GroupChanges> changesCodec = new GroupChangesCodec(); - senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId), + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId), changesCodec.encode(changes))); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java index 089bfb3..3e08b07 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java @@ -206,11 +206,13 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { @Override public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, final int numberOfTasks) { - LOG.entering("GroupCommDriverImpl", "newCommunicationGroup", new Object[]{Utils.simpleName(groupName), numberOfTasks}); + LOG.entering("GroupCommDriverImpl", "newCommunicationGroup", + new Object[]{Utils.simpleName(groupName), numberOfTasks}); final BroadcastingEventHandler<RunningTask> commGroupRunningTaskHandler = new BroadcastingEventHandler<>(); final BroadcastingEventHandler<FailedTask> commGroupFailedTaskHandler = new BroadcastingEventHandler<>(); final BroadcastingEventHandler<FailedEvaluator> commGroupFailedEvaluatorHandler = new BroadcastingEventHandler<>(); - final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler = new BroadcastingEventHandler<>(); + final BroadcastingEventHandler<GroupCommunicationMessage> commGroupMessageHandler = + new BroadcastingEventHandler<>(); final CommunicationGroupDriver commGroupDriver = new CommunicationGroupDriverImpl(groupName, confSerializer, senderStage, commGroupRunningTaskHandler, @@ -222,7 +224,8 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { groupCommRunningTaskHandler.addHandler(commGroupRunningTaskHandler); groupCommFailedTaskHandler.addHandler(commGroupFailedTaskHandler); groupCommMessageHandler.addHandler(groupName, commGroupMessageHandler); - LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup", "Created communication group: " + Utils.simpleName(groupName)); + LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup", + "Created communication group: " + Utils.simpleName(groupName)); return commGroupDriver; } @@ -311,7 +314,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { @Override public EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage() { LOG.entering("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage"); - LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", "Returning GroupCommFaileEvaluatorStage"); + LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", "Returning GroupCommFailedEvaluatorStage"); return groupCommFailedEvaluatorStage; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java index 1589bd7..f1a70be 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java @@ -56,7 +56,8 @@ public class TaskNodeImpl implements TaskNode { private final AtomicInteger version = new AtomicInteger(0); public TaskNodeImpl(final EStage<GroupCommunicationMessage> senderStage, - final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName, + final Class<? extends Name<String>> groupName, + final Class<? extends Name<String>> operatorName, final String taskId, final String driverId, final boolean isRoot) { this.senderStage = senderStage; this.groupName = groupName; @@ -102,7 +103,8 @@ public class TaskNodeImpl implements TaskNode { LOG.entering("TaskNodeImpl", "onFailedTask", getQualifiedName()); if (!running.compareAndSet(true, false)) { LOG.fine(getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!"); - LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!"); + LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() + + "Trying to set failed on an already failed task. Something fishy!!!"); return; } taskNodeStatus.clearStateAndReleaseLocks(); @@ -129,13 +131,15 @@ public class TaskNodeImpl implements TaskNode { LOG.entering("TaskNodeImpl", "onRunningTask", getQualifiedName()); if (!running.compareAndSet(false, true)) { LOG.fine(getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!"); - LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!"); + LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() + + "Trying to set running on an already running task. Something fishy!!!"); return; } final int version = this.version.get(); LOG.finest(getQualifiedName() + "Changed status to running version-" + version); if (parent != null && parent.isRunning()) { - final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parent.getTaskId(), + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parent.getTaskId(), parent.getVersion(), taskId, version, Utils.EMPTY_BYTE_ARR); taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); @@ -146,7 +150,8 @@ public class TaskNodeImpl implements TaskNode { } for (final TaskNode child : children) { if (child.isRunning()) { - final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, child.getTaskId(), + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, child.getTaskId(), child.getVersion(), taskId, version, Utils.EMPTY_BYTE_ARR); taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); @@ -167,7 +172,8 @@ public class TaskNodeImpl implements TaskNode { if (parent != null && parent.isRunning()) { final int parentVersion = parent.getVersion(); final String parentTaskId = parent.getTaskId(); - final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parentTaskId, + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parentTaskId, parentVersion, taskId, version.get(), Utils.EMPTY_BYTE_ARR); taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); @@ -186,7 +192,8 @@ public class TaskNodeImpl implements TaskNode { final int parentVersion = parent.getVersion(); final String parentTaskId = parent.getTaskId(); taskNodeStatus.updateFailureOf(parent.getTaskId()); - final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, parentTaskId, + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, parentTaskId, parentVersion, taskId, version.get(), Utils.EMPTY_BYTE_ARR); taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); @@ -203,7 +210,8 @@ public class TaskNodeImpl implements TaskNode { final TaskNode childTask = findTask(childId); if (childTask != null && childTask.isRunning()) { final int childVersion = childTask.getVersion(); - final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId, + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId, childVersion, taskId, version.get(), Utils.EMPTY_BYTE_ARR); taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); @@ -223,13 +231,15 @@ public class TaskNodeImpl implements TaskNode { if (childTask != null) { final int childVersion = childTask.getVersion(); taskNodeStatus.updateFailureOf(childId); - final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId, + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId, childVersion, taskId, version.get(), Utils.EMPTY_BYTE_ARR); taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); senderStage.onNext(gcm); } else { - throw new RuntimeException(getQualifiedName() + "Don't expect task for " + childId + " to be null. Something wrong"); + throw new RuntimeException(getQualifiedName() + "Don't expect task for " + childId + + " to be null. Something wrong"); } LOG.exiting("TaskNodeImpl", "onChildDead", getQualifiedName() + childId); } @@ -327,7 +337,8 @@ public class TaskNodeImpl implements TaskNode { private void sendTopoSetupMsg() { LOG.entering("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName() + taskId); LOG.fine(getQualifiedName() + "is an active participant in the topology"); - senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, taskId, + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, taskId, version.get(), Utils.EMPTY_BYTE_ARR)); taskNodeStatus.onTopologySetupMessageSent(); final boolean sentAlready = !topoSetupSent.compareAndSet(false, true); @@ -379,28 +390,35 @@ public class TaskNodeImpl implements TaskNode { private boolean parentActive() { LOG.entering("TaskNodeImpl", "parentActive", getQualifiedName()); if (isRoot) { - LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"})); + LOG.exiting("TaskNodeImpl", "parentActive", + Arrays.toString(new Object[]{true, getQualifiedName(), + "I am root. Will never have parent. So signalling active"})); return true; } if (isNeighborActive(parent.getTaskId())) { - LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is an active neghbor"})); + LOG.exiting("TaskNodeImpl", "parentActive", + Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is an active neighbor"})); return true; } - LOG.exiting("TaskNodeImpl", "parentActive", getQualifiedName() + "Neither root Nor is " + parent + " an active neghbor"); + LOG.exiting("TaskNodeImpl", "parentActive", + getQualifiedName() + "Neither root Nor is " + parent + " an active neighbor"); return false; } private boolean activeNeighborOfParent() { LOG.entering("TaskNodeImpl", "activeNeighborOfParent", getQualifiedName()); if (isRoot) { - LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"})); + LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), + "I am root. Will never have parent. So signalling active"})); return true; } if (parent.isNeighborActive(taskId)) { - LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am an active neighbor of parent ", parent})); + LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), + "I am an active neighbor of parent ", parent})); return true; } - LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{false, getQualifiedName(), "Neither is parent null Nor am I an active neighbor of parent ", parent})); + LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{false, getQualifiedName(), + "Neither is parent null Nor am I an active neighbor of parent ", parent})); return false; } @@ -409,11 +427,13 @@ public class TaskNodeImpl implements TaskNode { for (final TaskNode child : children) { final String childId = child.getTaskId(); if (child.isRunning() && !isNeighborActive(childId)) { - LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{false, getQualifiedName(), childId, " not active yet"})); + LOG.exiting("TaskNodeImpl", "allChildrenActive", + Arrays.toString(new Object[]{false, getQualifiedName(), childId, " not active yet"})); return false; } } - LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{true, getQualifiedName(), "All children active"})); + LOG.exiting("TaskNodeImpl", "allChildrenActive", + Arrays.toString(new Object[]{true, getQualifiedName(), "All children active"})); return true; } @@ -421,11 +441,13 @@ public class TaskNodeImpl implements TaskNode { LOG.entering("TaskNodeImpl", "activeNeighborOfAllChildren", getQualifiedName()); for (final TaskNode child : children) { if (child.isRunning() && !child.isNeighborActive(taskId)) { - LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor of child ", child})); + LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", + Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor of child ", child})); return false; } } - LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all children"})); + LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", + Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all children"})); return true; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java index 6503a92..12679b8 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java @@ -117,7 +117,8 @@ public class TaskNodeStatusImpl implements TaskNodeStatus { LOG.entering("TaskNodeStatusImpl", "expectAckFor", new Object[]{getQualifiedName(), msgType, srcId}); LOG.finest(getQualifiedName() + "Adding " + srcId + " to sources"); statusMap.add(msgType, srcId); - LOG.exiting("TaskNodeStatusImpl", "expectAckFor", getQualifiedName() + "Sources from which ACKs for " + msgType + " are expected: " + statusMap.get(msgType)); + LOG.exiting("TaskNodeStatusImpl", "expectAckFor", + getQualifiedName() + "Sources from which ACKs for " + msgType + " are expected: " + statusMap.get(msgType)); } @Override @@ -236,7 +237,8 @@ public class TaskNodeStatusImpl implements TaskNodeStatus { } private String getQualifiedName() { - return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + node.getVersion() + ") - "; + return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + + node.getVersion() + ") - "; } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java index ad3e385..709f866 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java @@ -55,8 +55,10 @@ public class TopologyUpdateWaitHandler implements EventHandler<List<TaskNode>> { * will continue their regular operation */ public TopologyUpdateWaitHandler(final EStage<GroupCommunicationMessage> senderStage, - final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operName, - final String driverId, final int driverVersion, final String dstId, final int dstVersion, + final Class<? extends Name<String>> groupName, + final Class<? extends Name<String>> operName, + final String driverId, final int driverVersion, + final String dstId, final int dstVersion, final String qualifiedName) { super(); this.senderStage = senderStage; @@ -86,7 +88,8 @@ public class TopologyUpdateWaitHandler implements EventHandler<List<TaskNode>> { LOG.finest(qualifiedName + "NodeTopologyUpdateWaitStage All to be updated nodes " + "have received TopologySetup"); LOG.fine(qualifiedName + "All affected parts of the topology are in TopologyUpdate phase. Will send a note to (" + dstId + "," + dstVersion + ")"); - senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, driverVersion, dstId, + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, driverVersion, dstId, dstVersion, Utils.EMPTY_BYTE_ARR)); LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java index f0366ec..b0a940d 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java @@ -312,7 +312,8 @@ public class TreeTopology implements Topology { for (final TaskNode node : toBeUpdatedNodes) { node.updatingTopology(); LOG.fine(getQualifiedName() + "Asking " + node + " to UpdateTopology"); - senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(), + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(), node.getVersion(), Utils.EMPTY_BYTE_ARR)); } nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes); @@ -334,7 +335,8 @@ public class TreeTopology implements Topology { final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged); final Codec<GroupChanges> changesCodec = new GroupChangesCodec(); LOG.fine(getQualifiedName() + "TopologyChanges: " + changes); - senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId), + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId), changesCodec.encode(changes))); LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + msg); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java index 40abe66..a5a0930 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java @@ -38,8 +38,10 @@ public class CommGroupNetworkHandlerImpl implements private static final Logger LOG = Logger.getLogger(CommGroupNetworkHandlerImpl.class.getName()); - private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> operHandlers = new ConcurrentHashMap<>(); - private final Map<Class<? extends Name<String>>, BlockingQueue<GroupCommunicationMessage>> topologyNotifications = new ConcurrentHashMap<>(); + private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> operHandlers = + new ConcurrentHashMap<>(); + private final Map<Class<? extends Name<String>>, BlockingQueue<GroupCommunicationMessage>> topologyNotifications = + new ConcurrentHashMap<>(); @Inject public CommGroupNetworkHandlerImpl() { @@ -50,7 +52,8 @@ public class CommGroupNetworkHandlerImpl implements final EventHandler<GroupCommunicationMessage> operHandler) { LOG.entering("CommGroupNetworkHandlerImpl", "register", new Object[]{Utils.simpleName(operName), operHandler}); operHandlers.put(operName, operHandler); - LOG.exiting("CommGroupNetworkHandlerImpl", "register", Arrays.toString(new Object[]{Utils.simpleName(operName), operHandler})); + LOG.exiting("CommGroupNetworkHandlerImpl", "register", + Arrays.toString(new Object[]{Utils.simpleName(operName), operHandler})); } @Override @@ -65,7 +68,8 @@ public class CommGroupNetworkHandlerImpl implements public void onNext(final GroupCommunicationMessage msg) { LOG.entering("CommGroupNetworkHandlerImpl", "onNext", msg); final Class<? extends Name<String>> operName = Utils.getClass(msg.getOperatorname()); - if (msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated || msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges) { + if (msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated || + msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges) { topologyNotifications.get(operName).add(msg); } else { operHandlers.get(operName).onNext(msg); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java index 1eacb03..f9143d9 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java @@ -205,7 +205,8 @@ public class CommunicationGroupClientImpl implements CommunicationGroupServiceCl final Class<? extends Name<String>> operName = op.getOperName(); LOG.finest("Sending TopologyChanges msg to driver"); try { - sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, taskId, op.getVersion(), driverId, + sender.send(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, taskId, op.getVersion(), driverId, 0, Utils.EMPTY_BYTE_ARR)); } catch (final NetworkException e) { throw new RuntimeException("NetworkException while sending GetTopologyChanges", e); @@ -247,7 +248,8 @@ public class CommunicationGroupClientImpl implements CommunicationGroupServiceCl for (final GroupCommOperator op : operators.values()) { final Class<? extends Name<String>> operName = op.getOperName(); try { - sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, taskId, op.getVersion(), driverId, + sender.send(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, taskId, op.getVersion(), driverId, 0, Utils.EMPTY_BYTE_ARR)); } catch (final NetworkException e) { throw new RuntimeException("NetworkException while sending UpdateTopology", e); @@ -277,7 +279,8 @@ public class CommunicationGroupClientImpl implements CommunicationGroupServiceCl } else { retVal = true; } - LOG.exiting("CommunicationGroupClientImpl", "isMsgVersionOk", Arrays.toString(new Object[]{retVal, getQualifiedName(), msg})); + LOG.exiting("CommunicationGroupClientImpl", "isMsgVersionOk", + Arrays.toString(new Object[]{retVal, getQualifiedName(), msg})); return retVal; } else { throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs"); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java index 30165f9..4e635c9 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java @@ -45,7 +45,8 @@ import java.util.logging.Logger; public class GroupCommClientImpl implements GroupCommClient { private static final Logger LOG = Logger.getLogger(GroupCommClientImpl.class.getName()); - private final Map<Class<? extends Name<String>>, CommunicationGroupServiceClient> communicationGroups = new HashMap<>(); + private final Map<Class<? extends Name<String>>, CommunicationGroupServiceClient> communicationGroups = + new HashMap<>(); @Inject public GroupCommClientImpl( http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java index 867e548..e018832 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java @@ -35,7 +35,8 @@ public class GroupCommNetworkHandlerImpl implements GroupCommNetworkHandler { private static final Logger LOG = Logger.getLogger(GroupCommNetworkHandlerImpl.class.getName()); - private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> commGroupHandlers = new ConcurrentHashMap<>(); + private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> commGroupHandlers = + new ConcurrentHashMap<>(); @Inject public GroupCommNetworkHandlerImpl() { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java index a5c6baa..7418d33 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java @@ -180,7 +180,8 @@ public class OperatorTopologyImpl implements OperatorTopology { } else { retVal = true; } - LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk", Arrays.toString(new Object[]{retVal, getQualifiedName(), msg})); + LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk", + Arrays.toString(new Object[]{retVal, getQualifiedName(), msg})); return retVal; } else { throw new RuntimeException(getQualifiedName() + "can only deal with versioned msgs"); @@ -195,21 +196,25 @@ public class OperatorTopologyImpl implements OperatorTopology { } @Override - public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws ParentDeadException { + public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) + throws ParentDeadException { LOG.entering("OperatorTopologyImpl", "sendToParent", new Object[]{getQualifiedName(), data, msgType}); refreshEffectiveTopology(); assert (effectiveTopology != null); effectiveTopology.sendToParent(data, msgType); - LOG.exiting("OperatorTopologyImpl", "sendToParent", Arrays.toString(new Object[]{getQualifiedName(), data, msgType})); + LOG.exiting("OperatorTopologyImpl", "sendToParent", + Arrays.toString(new Object[]{getQualifiedName(), data, msgType})); } @Override - public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws ParentDeadException { + public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) + throws ParentDeadException { LOG.entering("OperatorTopologyImpl", "sendToChildren", new Object[]{getQualifiedName(), data, msgType}); refreshEffectiveTopology(); assert (effectiveTopology != null); effectiveTopology.sendToChildren(data, msgType); - LOG.exiting("OperatorTopologyImpl", "sendToChildren", Arrays.toString(new Object[]{getQualifiedName(), data, msgType})); + LOG.exiting("OperatorTopologyImpl", "sendToChildren", + Arrays.toString(new Object[]{getQualifiedName(), data, msgType})); } @Override @@ -223,7 +228,8 @@ public class OperatorTopologyImpl implements OperatorTopology { } @Override - public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final Codec<T> dataCodec) throws ParentDeadException { + public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final Codec<T> dataCodec) + throws ParentDeadException { LOG.entering("OperatorTopologyImpl", "recvFromChildren", getQualifiedName()); refreshEffectiveTopology(); assert (effectiveTopology != null); @@ -288,9 +294,12 @@ public class OperatorTopologyImpl implements OperatorTopology { baseTopology.setChanges(true); LOG.finest(getQualifiedName() + "Waiting for ctrl msgs"); - for (GroupCommunicationMessage msg = deltas.take(); msg.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup; msg = deltas.take()) { + for (GroupCommunicationMessage msg = deltas.take(); + msg.getType() != ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup; + msg = deltas.take()) { LOG.finest(getQualifiedName() + "Got " + msg.getType() + " msg from " + msg.getSrcid()); - if (effectiveTopology == null && msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) { + if (effectiveTopology == null && + msg.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) { /** * If effectiveTopology!=null, this method is being called from the BaseTopologyUpdateStage * And exception thrown will be caught by uncaughtExceptionHandler leading to System.exit @@ -324,23 +333,28 @@ public class OperatorTopologyImpl implements OperatorTopology { final int srcVersion = msg.getSrcVersion(); switch (msg.getType()) { case UpdateTopology: - sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, selfId, this.version, driverId, + sender.send(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, selfId, this.version, driverId, srcVersion, Utils.EMPTY_BYTE_ARR)); break; case ParentAdd: - sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, selfId, this.version, srcId, + sender.send(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, selfId, this.version, srcId, srcVersion, Utils.EMPTY_BYTE_ARR), driverId); break; case ParentDead: - sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, selfId, this.version, srcId, + sender.send(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, selfId, this.version, srcId, srcVersion, Utils.EMPTY_BYTE_ARR), driverId); break; case ChildAdd: - sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, selfId, this.version, srcId, + sender.send(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, selfId, this.version, srcId, srcVersion, Utils.EMPTY_BYTE_ARR), driverId); break; case ChildDead: - sender.send(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, selfId, this.version, srcId, + sender.send(Utils.bldVersionedGCM(groupName, operName, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, selfId, this.version, srcId, srcVersion, Utils.EMPTY_BYTE_ARR), driverId); break; default: @@ -379,7 +393,8 @@ public class OperatorTopologyImpl implements OperatorTopology { for (final GroupCommunicationMessage msg : deletionDeltasForUpdate) { final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType = msg.getType(); if (msgType == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) { - throw new ParentDeadException(getQualifiedName() + "Parent dead. Current behavior is for the child to die too."); + throw new ParentDeadException(getQualifiedName() + + "Parent dead. Current behavior is for the child to die too."); } } LOG.exiting("OperatorTopologyImpl", "copyDeletionDeltas", Arrays.toString(new Object[]{getQualifiedName(), http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java index 90d3b7c..483ac1b 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java @@ -89,7 +89,8 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { @Override public String toString() { - return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + "(" + selfId + "," + version + ")"; + return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + + "(" + selfId + "," + version + ")"; } @Override @@ -130,7 +131,8 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { @Override public boolean hasChanges() { LOG.entering("OperatorTopologyStructImpl", "hasChanges", getQualifiedName()); - LOG.exiting("OperatorTopologyStructImpl", "hasChanges", Arrays.toString(new Object[]{this.changes, getQualifiedName()})); + LOG.exiting("OperatorTopologyStructImpl", "hasChanges", + Arrays.toString(new Object[]{this.changes, getQualifiedName()})); return this.changes; } @@ -166,11 +168,14 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { } else { retVal = findChild(srcId); } - LOG.exiting("OperatorTopologyStructImpl", "findNode", Arrays.toString(new Object[]{retVal, getQualifiedName(), srcId})); + LOG.exiting("OperatorTopologyStructImpl", "findNode", + Arrays.toString(new Object[]{retVal, getQualifiedName(), srcId})); return retVal; } - private void sendToNode(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final NodeStruct node) { + private void sendToNode(final byte[] data, + final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, + final NodeStruct node) { LOG.entering("OperatorTopologyStructImpl", "sendToNode", new Object[]{getQualifiedName(), data, msgType, node}); final String nodeId = node.getId(); try { @@ -190,7 +195,8 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { } } - sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(), data)); + sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(), + data)); if (data.length > SMALL_MSG_LENGTH) { LOG.finest(getQualifiedName() + "Msg too big. Will wait for ACK before queing up one more msg"); @@ -230,8 +236,8 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { LOG.fine(msg); } } - LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode", Arrays.toString(new Object[]{retVal, getQualifiedName(), - node, remove})); + LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode", + Arrays.toString(new Object[]{retVal, getQualifiedName(), node, remove})); return retVal; } @@ -320,8 +326,8 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { childrenToRcvFrom.remove(child.getId()); } final T retVal = retLst.isEmpty() ? null : retLst.get(0); - LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", Arrays.toString(new Object[]{retVal, getQualifiedName(), - redFunc, dataCodec})); + LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", + Arrays.toString(new Object[]{retVal, getQualifiedName(), redFunc, dataCodec})); return retVal; } @@ -358,11 +364,10 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { LOG.entering("OperatorTopologyStructImpl", "addedToDeadMsgs", new Object[]{getQualifiedName(), node, msgSrcId, msgSrcVersion}); if (node == null) { - LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS Queing up for add to handle"); + LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS Queuing up for add to handle"); addToDeadMsgs(msgSrcId, msgSrcVersion); - LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", Arrays.toString(new Object[]{true, getQualifiedName(), - node, msgSrcId, - msgSrcVersion})); + LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", + Arrays.toString(new Object[]{true, getQualifiedName(), node, msgSrcId, msgSrcVersion})); return true; } final int nodeVersion = node.getVersion(); @@ -370,14 +375,12 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { LOG.warning(getQualifiedName() + "Got an OOS dead msg. " + "Has HIGHER ver-" + msgSrcVersion + " than node ver-" + nodeVersion + ". Queing up for add to handle"); addToDeadMsgs(msgSrcId, msgSrcVersion); - LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", Arrays.toString(new Object[]{true, getQualifiedName(), - node, msgSrcId, - msgSrcVersion})); + LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", + Arrays.toString(new Object[]{true, getQualifiedName(), node, msgSrcId, msgSrcVersion})); return true; } - LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", Arrays.toString(new Object[]{false, getQualifiedName(), - node, msgSrcId, - msgSrcVersion})); + LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", + Arrays.toString(new Object[]{false, getQualifiedName(), node, msgSrcId, msgSrcVersion})); return false; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java index d484170..902ccf7 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java @@ -91,7 +91,8 @@ public class ConcurrentCountingMap<K, V> { } public static void main(final String[] args) { - final ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> strMap = new ConcurrentCountingMap<>(); + final ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> strMap = + new ConcurrentCountingMap<>(); LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty()); strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); LOG.log(Level.INFO, "OUT: {0}", strMap); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java index 633c740..5614d0c 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java @@ -34,7 +34,9 @@ public final class Utils { public static GroupCommunicationMessage bldVersionedGCM(final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operName, - final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final String from, final int srcVersion, + final ReefNetworkGroupCommProtos.GroupCommMessage.Type + msgType, + final String from, final int srcVersion, final String to, final int dstVersion, final byte[]... data) { return new GroupCommunicationMessage(groupName.getName(), operName.getName(), msgType, from, srcVersion, to, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java index db9e740..9b91aa8 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java @@ -118,7 +118,8 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> { final EventHandler<Message<T>> recvHandler, final EventHandler<Exception> exHandler) { this(factory, nsPort, nameServerAddr, nameServerPort, - RETRY_COUNT, RETRY_TIMEOUT, codec, tpFactory, recvHandler, exHandler, LocalAddressProviderFactory.getInstance()); + RETRY_COUNT, RETRY_TIMEOUT, codec, tpFactory, recvHandler, exHandler, + LocalAddressProviderFactory.getInstance()); } /** @@ -136,8 +137,8 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> { final TransportFactory tpFactory, final EventHandler<Message<T>> recvHandler, final EventHandler<Exception> exHandler) { - this(factory, nsPort, nameServerAddr, nameServerPort, retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler, - LocalAddressProviderFactory.getInstance()); + this(factory, nsPort, nameServerAddr, nameServerPort, retryCount, retryTimeout, codec, tpFactory, recvHandler, + exHandler, LocalAddressProviderFactory.getInstance()); } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java index 13d9281..c79f88a 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java @@ -34,7 +34,8 @@ public class NetworkServiceParameters { } - @NamedParameter(doc = "identifier factory for the service", short_name = "factory", default_class = StringIdentifierFactory.class) + @NamedParameter(doc = "identifier factory for the service", short_name = "factory", + default_class = StringIdentifierFactory.class) public static class NetworkServiceIdentifierFactory implements Name<IdentifierFactory> { } @@ -46,7 +47,8 @@ public class NetworkServiceParameters { public static class NetworkServiceCodec implements Name<Codec<?>> { } - @NamedParameter(doc = "transport factory for the network service", short_name = "nstransportfactory", default_class = MessagingTransportFactory.class) + @NamedParameter(doc = "transport factory for the network service", short_name = "nstransportfactory", + default_class = MessagingTransportFactory.class) public static class NetworkServiceTransportFactory implements Name<TransportFactory> { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java index c391b67..ecfba79 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java @@ -62,7 +62,8 @@ public final class NameClient implements NameResolver { final int retryCount, final int retryTimeout, final Cache<Identifier, InetSocketAddress> cache) { - this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache, LocalAddressProviderFactory.getInstance()); + this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache, + LocalAddressProviderFactory.getInstance()); } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java index e27ca74..0bcd71b 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java @@ -98,7 +98,8 @@ public class NameLookupClient implements Stage, NamingLookup { final int retryCount, final int retryTimeout, final Cache<Identifier, InetSocketAddress> cache) { - this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache, LocalAddressProviderFactory.getInstance()); + this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, cache, + LocalAddressProviderFactory.getInstance()); } @Deprecated http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java index b23481d..6270722 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java @@ -72,7 +72,8 @@ public class NameRegistryClient implements Stage, NamingRegistry { * @param factory an identifier factory */ public NameRegistryClient( - final String serverAddr, final int serverPort, final IdentifierFactory factory, final LocalAddressProvider localAddressProvider) { + final String serverAddr, final int serverPort, final IdentifierFactory factory, + final LocalAddressProvider localAddressProvider) { this(serverAddr, serverPort, 10000, factory, localAddressProvider); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java index d990aca..8fc7cae 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java @@ -83,7 +83,8 @@ public final class NameServerImpl implements NameServer { injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress()); injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); - injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(new NamingServerHandler(handler, codec))); + injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, + new SyncStage<>(new NamingServerHandler(handler, codec))); try { this.transport = injector.getInstance(NettyMessagingTransport.class); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java index 7fe688a..2fa0cc2 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java @@ -56,7 +56,8 @@ final class NamingCodecFactory { Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> clazzToCodecMap = new HashMap<Class<? extends NamingMessage>, Codec<? extends NamingMessage>>(); clazzToCodecMap.put(NamingRegisterRequest.class, new NamingRegisterRequestCodec(factory)); - clazzToCodecMap.put(NamingRegisterResponse.class, new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory))); + clazzToCodecMap.put(NamingRegisterResponse.class, + new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory))); clazzToCodecMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestCodec(factory)); Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap); return codec; @@ -74,7 +75,8 @@ final class NamingCodecFactory { clazzToCodecMap.put(NamingLookupRequest.class, new NamingLookupRequestCodec(factory)); clazzToCodecMap.put(NamingLookupResponse.class, new NamingLookupResponseCodec(factory)); clazzToCodecMap.put(NamingRegisterRequest.class, new NamingRegisterRequestCodec(factory)); - clazzToCodecMap.put(NamingRegisterResponse.class, new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory))); + clazzToCodecMap.put(NamingRegisterResponse.class, + new NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory))); clazzToCodecMap.put(NamingUnregisterRequest.class, new NamingUnregisterRequestCodec(factory)); Codec<NamingMessage> codec = new MultiCodec<NamingMessage>(clazzToCodecMap); return codec; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java index ee1c848..18a979f 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java @@ -66,7 +66,8 @@ public class NamingRegisterRequestCodec implements Codec<NamingRegisterRequest> */ @Override public NamingRegisterRequest decode(byte[] buf) { - final AvroNamingRegisterRequest avroNamingRegisterRequest = AvroUtils.fromBytes(buf, AvroNamingRegisterRequest.class); + final AvroNamingRegisterRequest avroNamingRegisterRequest = + AvroUtils.fromBytes(buf, AvroNamingRegisterRequest.class); return new NamingRegisterRequest( new NameAssignmentTuple(factory.getNewInstance(avroNamingRegisterRequest.getId().toString()), new InetSocketAddress(avroNamingRegisterRequest.getHost().toString(), avroNamingRegisterRequest.getPort())) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java index 7ae940d..bb56109 100644 --- a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java +++ b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java @@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter; /** * The probability with which a crash will occur. */ -@NamedParameter(doc = "the probability with which a crash will occur.", default_value = "" + CrashProbability.DEFAULT_VALUE) +@NamedParameter(doc = "the probability with which a crash will occur.", + default_value = "" + CrashProbability.DEFAULT_VALUE) public final class CrashProbability implements Name<Double> { public static final double DEFAULT_VALUE = 0.1; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java index c70ae34..1602db0 100644 --- a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java +++ b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java @@ -21,7 +21,8 @@ package org.apache.reef.poison.params; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; -@NamedParameter(doc = "The time window (in seconds) after ContextStart in which the crash will occur", default_value = "" + CrashTimeout.DEFAULT_VALUE) +@NamedParameter(doc = "The time window (in seconds) after ContextStart in which the crash will occur", + default_value = "" + CrashTimeout.DEFAULT_VALUE) public final class CrashTimeout implements Name<Integer> { public static final int DEFAULT_VALUE = 10; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java index 8991fa1..651c4ca 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java @@ -53,10 +53,14 @@ public final class HDICLI { this.hdInsightInstance = hdInsightInstance; this.logFetcher = logFetcher; final OptionGroup commands = new OptionGroup() - .addOption(OptionBuilder.withArgName(KILL).hasArg().withDescription("Kills the given application.").create(KILL)) - .addOption(OptionBuilder.withArgName(LOGS).hasArg().withDescription("Fetches the logs for the given application.").create(LOGS)) - .addOption(OptionBuilder.withArgName(STATUS).hasArg().withDescription("Fetches the status for the given application.").create(STATUS)) - .addOption(OptionBuilder.withArgName(LIST).withDescription("Lists the application on the cluster.").create(LIST)); + .addOption(OptionBuilder.withArgName(KILL).hasArg() + .withDescription("Kills the given application.").create(KILL)) + .addOption(OptionBuilder.withArgName(LOGS).hasArg() + .withDescription("Fetches the logs for the given application.").create(LOGS)) + .addOption(OptionBuilder.withArgName(STATUS).hasArg() + .withDescription("Fetches the status for the given application.").create(STATUS)) + .addOption(OptionBuilder.withArgName(LIST) + .withDescription("Lists the application on the cluster.").create(LIST)); this.options = new Options().addOptionGroup(commands); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java index c269287..e8c277f 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java @@ -65,7 +65,8 @@ final class LogFetcher { final String accountKey, final String containerName) throws URISyntaxException, InvalidKeyException, StorageException { - final CloudStorageAccount cloudStorageAccount = CloudStorageAccount.parse(getStorageConnectionString(accountName, accountKey)); + final CloudStorageAccount cloudStorageAccount = + CloudStorageAccount.parse(getStorageConnectionString(accountName, accountKey)); final CloudBlobClient blobClient = cloudStorageAccount.createCloudBlobClient(); return blobClient.getContainerReference(containerName); } @@ -98,7 +99,8 @@ final class LogFetcher { } } - private FileStatus[] downloadLogs(final String applicationId) throws StorageException, IOException, URISyntaxException { + private FileStatus[] downloadLogs(final String applicationId) + throws StorageException, IOException, URISyntaxException { final File localFolder = downloadToTempFolder(applicationId); final Path localFolderPath = new Path(localFolder.getAbsolutePath()); return this.fileSystem.listStatus(localFolderPath); @@ -113,7 +115,8 @@ final class LogFetcher { * @throws StorageException * @throws IOException */ - private File downloadToTempFolder(final String applicationId) throws URISyntaxException, StorageException, IOException { + private File downloadToTempFolder(final String applicationId) + throws URISyntaxException, StorageException, IOException { final File outputFolder = Files.createTempDirectory("reeflogs-" + applicationId).toFile(); outputFolder.mkdirs(); final CloudBlobDirectory logFolder = this.container.getDirectoryReference(LOG_FOLDER_PREFIX + applicationId + "/"); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java index 311ef94..14839a2 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java @@ -97,7 +97,8 @@ final class LogFileEntry { * @param numberOfBytes * @throws IOException */ - private void write(final DataInputStream stream, final Writer outputWriter, final int numberOfBytes) throws IOException { + private void write(final DataInputStream stream, final Writer outputWriter, final int numberOfBytes) + throws IOException { final byte[] buf = new byte[65535]; int lenRemaining = numberOfBytes; while (lenRemaining > 0) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java index a05b93c..155a6cf 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java @@ -113,7 +113,8 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler .setCommand(command)); this.hdInsightInstance.submitApplication(applicationSubmission); - LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", applicationID.getApplicationId()); + LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", + applicationID.getApplicationId()); } catch (final IOException ex) { LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex);
