http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java deleted file mode 100644 index ffee177..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/AllocatedEvaluatorBridge.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.bridge.client.events; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.bridge.client.IDriverServiceClient; -import org.apache.reef.bridge.client.JVMClientProcess; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorDescriptor; -import org.apache.reef.driver.evaluator.EvaluatorProcess; -import org.apache.reef.tang.Configuration; -import org.apache.reef.util.Optional; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -/** - * Allocated Evaluator Stub. - */ -@Private -public final class AllocatedEvaluatorBridge implements AllocatedEvaluator { - - private final String evaluatorId; - - private final EvaluatorDescriptor evaluatorDescriptor; - - private final IDriverServiceClient driverServiceClient; - - private final List<File> addFileList = new ArrayList<>(); - - private final List<File> addLibraryList = new ArrayList<>(); - - private JVMClientProcess evaluatorProcess = null; - - public AllocatedEvaluatorBridge( - final String evaluatorId, - final EvaluatorDescriptor evaluatorDescriptor, - final IDriverServiceClient driverServiceClient) { - this.evaluatorId = evaluatorId; - this.evaluatorDescriptor = evaluatorDescriptor; - this.driverServiceClient = driverServiceClient; - } - - @Override - public String getId() { - return this.evaluatorId; - } - - @Override - public void addFile(final File file) { - this.addFileList.add(file); - } - - @Override - public void addLibrary(final File file) { - this.addLibraryList.add(file); - } - - @Override - public EvaluatorDescriptor getEvaluatorDescriptor() { - return this.evaluatorDescriptor; - } - - @Override - public void setProcess(final EvaluatorProcess process) { - if (process instanceof JVMClientProcess) { - this.evaluatorProcess = (JVMClientProcess) process; - } else { - throw new IllegalArgumentException(JVMClientProcess.class.getCanonicalName() + " required."); - } - } - - @Override - public void close() { - this.driverServiceClient.onEvaluatorClose(getId()); - } - - @Override - public void submitTask(final Configuration taskConfiguration) { - this.driverServiceClient.onEvaluatorSubmit( - getId(), - Optional.<Configuration>empty(), - Optional.of(taskConfiguration), - this.evaluatorProcess== null ? - Optional.<JVMClientProcess>empty() : - Optional.of(this.evaluatorProcess), - this.addFileList.size() == 0 ? - Optional.<List<File>>empty() : - Optional.of(this.addFileList), - this.addLibraryList.size() == 0 ? - Optional.<List<File>>empty() : - Optional.of(this.addLibraryList)); - } - - @Override - public void submitContext(final Configuration contextConfiguration) { - - this.driverServiceClient.onEvaluatorSubmit( - getId(), - Optional.of(contextConfiguration), - Optional.<Configuration>empty(), - this.evaluatorProcess== null ? - Optional.<JVMClientProcess>empty() : - Optional.of(this.evaluatorProcess), - this.addFileList.size() == 0 ? - Optional.<List<File>>empty() : - Optional.of(this.addFileList), - this.addLibraryList.size() == 0 ? - Optional.<List<File>>empty() : - Optional.of(this.addLibraryList)); - } - - @Override - public void submitContextAndService( - final Configuration contextConfiguration, - final Configuration serviceConfiguration) { - throw new UnsupportedOperationException(); - } - - @Override - public void submitContextAndTask( - final Configuration contextConfiguration, - final Configuration taskConfiguration) { - - this.driverServiceClient.onEvaluatorSubmit( - getId(), - Optional.of(contextConfiguration), - Optional.of(taskConfiguration), - this.evaluatorProcess== null ? - Optional.<JVMClientProcess>empty() : - Optional.of(this.evaluatorProcess), - this.addFileList.size() == 0 ? - Optional.<List<File>>empty() : - Optional.of(this.addFileList), - this.addLibraryList.size() == 0 ? - Optional.<List<File>>empty() : - Optional.of(this.addLibraryList)); - } - - @Override - public void submitContextAndServiceAndTask( - final Configuration contextConfiguration, - final Configuration serviceConfiguration, - final Configuration taskConfiguration) { - throw new UnsupportedOperationException(); - } - - -}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java deleted file mode 100644 index d40f052..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ClosedContextBridge.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.evaluator.EvaluatorDescriptor; -import org.apache.reef.util.Optional; - -/** - * Closed context bridge. - */ -@Private -public final class ClosedContextBridge implements ClosedContext { - - private final String contextId; - - private final String evaluatorId; - - private final ActiveContext parentContext; - - private final EvaluatorDescriptor evaluatorDescriptor; - - public ClosedContextBridge( - final String contextId, - final String evaluatorId, - final ActiveContext parentContext, - final EvaluatorDescriptor evaluatorDescriptor) { - this.contextId = contextId; - this.evaluatorId = evaluatorId; - this.parentContext = parentContext; - this.evaluatorDescriptor = evaluatorDescriptor; - } - - @Override - public ActiveContext getParentContext() { - return this.parentContext; - } - - @Override - public String getId() { - return this.contextId; - } - - @Override - public String getEvaluatorId() { - return this.evaluatorId; - } - - @Override - public Optional<String> getParentId() { - return Optional.of(this.parentContext.getId()); - } - - @Override - public EvaluatorDescriptor getEvaluatorDescriptor() { - return this.evaluatorDescriptor; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java deleted file mode 100644 index 12f6e3b..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedEvaluatorBridge.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.driver.evaluator.CompletedEvaluator; - -/** - * Completed Evaluator bridge. - */ -public final class CompletedEvaluatorBridge implements CompletedEvaluator { - - private final String id; - - public CompletedEvaluatorBridge(final String id) { - this.id = id; - } - - @Override - public String getId() { - return this.id; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java deleted file mode 100644 index bed9129..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/CompletedTaskBridge.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.task.CompletedTask; - -/** - * Completed task bridge. - */ -@Private -public final class CompletedTaskBridge implements CompletedTask { - - private final String taskId; - - private final ActiveContext context; - - private final byte[] result; - - public CompletedTaskBridge( - final String taskId, - final ActiveContext context, - final byte[] result) { - this.taskId = taskId; - this.context = context; - this.result = result; - } - - @Override - public ActiveContext getActiveContext() { - return this.context; - } - - @Override - public String getId() { - return this.taskId; - } - - @Override - public byte[] get() { - return this.result; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java deleted file mode 100644 index aea29f6..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ContextMessageBridge.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.context.ContextMessage; - -/** - * Context message bridge. - */ -@Private -public final class ContextMessageBridge implements ContextMessage { - - private final String contextId; - - private final String messageSourceId; - - private final long sequenceNumber; - - private final byte[] message; - - public ContextMessageBridge( - final String contextId, - final String messageSourceId, - final long sequenceNumber, - final byte[] message) { - this.contextId = contextId; - this.messageSourceId = messageSourceId; - this.sequenceNumber = sequenceNumber; - this.message = message; - } - - @Override - public byte[] get() { - return this.message; - } - - @Override - public String getId() { - return this.contextId; - } - - @Override - public String getMessageSourceID() { - return this.messageSourceId; - } - - @Override - public long getSequenceNumber() { - return this.sequenceNumber; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java deleted file mode 100644 index 45bb8af..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedContextBridge.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.EvaluatorDescriptor; -import org.apache.reef.exception.EvaluatorException; -import org.apache.reef.util.Optional; - -/** - * Failed context bridge. - */ -public final class FailedContextBridge implements FailedContext { - - private final String contextId; - - private final String evaluatorId; - - private final String message; - - private final EvaluatorDescriptor evaluatorDescriptor; - - private final Optional<ActiveContext> parentContext; - - private final Optional<byte[]> data; - - public FailedContextBridge( - final String contextId, - final String evaluatorId, - final String message, - final EvaluatorDescriptor evaluatorDescriptor, - final Optional<ActiveContext> parentContext, - final Optional<byte[]> data) { - this.contextId = contextId; - this.evaluatorId = evaluatorId; - this.message = message; - this.evaluatorDescriptor = evaluatorDescriptor; - this.parentContext = parentContext; - this.data = data; - } - - @Override - public Optional<ActiveContext> getParentContext() { - return this.parentContext; - } - - @Override - public String getMessage() { - return this.message; - } - - @Override - public Optional<String> getDescription() { - return Optional.of(message); - } - - @Override - public Optional<Throwable> getReason() { - return Optional.<Throwable>of(new EvaluatorException(this.evaluatorId, this.message)); - } - - @Override - public Optional<byte[]> getData() { - return this.data; - } - - @Override - public Throwable asError() { - return new EvaluatorException(this.evaluatorId, this.message); - } - - @Override - public String getEvaluatorId() { - return this.evaluatorId; - } - - @Override - public Optional<String> getParentId() { - return this.parentContext.isPresent() ? - Optional.of(this.parentContext.get().getId()) : Optional.<String>empty(); - } - - @Override - public EvaluatorDescriptor getEvaluatorDescriptor() { - return this.evaluatorDescriptor; - } - - @Override - public String getId() { - return this.contextId; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java deleted file mode 100644 index 40bdc58..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/FailedEvaluatorBridge.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.driver.task.FailedTask; -import org.apache.reef.exception.EvaluatorException; -import org.apache.reef.util.Optional; - -import java.util.List; - -/** - * Failed Evaluator bridge. - */ -@Private -public final class FailedEvaluatorBridge implements FailedEvaluator { - - private final String id; - - private final EvaluatorException evaluatorException; - - private final List<FailedContext> failedContextList; - - private Optional<FailedTask> failedTask; - - public FailedEvaluatorBridge( - final String id, - final EvaluatorException evaluatorException, - final List<FailedContext> failedContextList, - final Optional<FailedTask> failedTask) { - this.id = id; - this.evaluatorException = evaluatorException; - this.failedContextList = failedContextList; - this.failedTask = failedTask; - } - - @Override - public EvaluatorException getEvaluatorException() { - return this.evaluatorException; - } - - @Override - public List<FailedContext> getFailedContextList() { - return this.failedContextList; - } - - @Override - public Optional<FailedTask> getFailedTask() { - return this.failedTask; - } - - @Override - public String getId() { - return this.id; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java deleted file mode 100644 index a24c294..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/RunningTaskBridge.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.bridge.client.IDriverServiceClient; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.task.RunningTask; -import org.apache.reef.runtime.common.driver.task.TaskRepresenter; -import org.apache.reef.util.Optional; - -/** - * Running task bridge. - */ -@Private -public final class RunningTaskBridge implements RunningTask { - - private final IDriverServiceClient driverServiceClient; - - private final String taskId; - - private final ActiveContext context; - - - public RunningTaskBridge( - final IDriverServiceClient driverServiceClient, - final String taskId, - final ActiveContext context) { - this.driverServiceClient = driverServiceClient; - this.taskId = taskId; - this.context = context; - } - - @Override - public ActiveContext getActiveContext() { - return this.context; - } - - @Override - public void send(final byte[] message) { - this.driverServiceClient.onTaskMessage(this.taskId, message); - } - - @Override - public void suspend(final byte[] message) { - throw new UnsupportedOperationException("Suspend task not supported"); - } - - @Override - public void suspend() { - throw new UnsupportedOperationException("Suspend task not supported"); - } - - @Override - public void close(final byte[] message) { - this.driverServiceClient.onTaskClose(this.taskId, Optional.of(message)); - } - - @Override - public void close() { - this.driverServiceClient.onTaskClose(this.taskId, Optional.<byte[]>empty()); - } - - @Override - public TaskRepresenter getTaskRepresenter() { - throw new UnsupportedOperationException("Not a public API"); - } - - @Override - public String getId() { - return this.taskId; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java deleted file mode 100644 index 625f3cc..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/TaskMessageBridge.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.events; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.driver.task.TaskMessage; - -/** - * Task message bridge. - */ -@Private -public final class TaskMessageBridge implements TaskMessage { - - private final String taskId; - - private final String contextId; - - private final String messageSourceId; - - private final long sequenceNumber; - - private final byte[] message; - - public TaskMessageBridge( - final String taskId, - final String contextId, - final String messageSourceId, - final long sequenceNumber, - final byte[] message) { - this.taskId = taskId; - this.contextId = contextId; - this.messageSourceId = messageSourceId; - this.sequenceNumber = sequenceNumber; - this.message = message; - } - - @Override - public byte[] get() { - return this.message; - } - - @Override - public String getId() { - return this.taskId; - } - - @Override - public long getSequenceNumber() { - return this.sequenceNumber; - } - - @Override - public String getContextId() { - return this.contextId; - } - - @Override - public String getMessageSourceID() { - return this.messageSourceId; - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java deleted file mode 100644 index 0c9504d..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * REEF event stubs. - */ -package org.apache.reef.bridge.client.events; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java deleted file mode 100644 index 7c65ce5..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientGrpcConfiguration.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.grpc; - -import org.apache.reef.bridge.client.IDriverClientService; -import org.apache.reef.bridge.client.IDriverServiceClient; -import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.tang.formats.ConfigurationModuleBuilder; -import org.apache.reef.tang.formats.RequiredParameter; - -/** - * Configuration module for Grpc runtime. - */ -public final class DriverClientGrpcConfiguration extends ConfigurationModuleBuilder { - - public static final RequiredParameter<Integer> DRIVER_SERVICE_PORT = new RequiredParameter<>(); - - public static final ConfigurationModule CONF = new DriverClientGrpcConfiguration() - .bindImplementation(IDriverClientService.class, DriverClientService.class) - .bindImplementation(IDriverServiceClient.class, DriverServiceClient.class) - .bindNamedParameter(DriverServicePort.class, DRIVER_SERVICE_PORT) - .build(); - -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java deleted file mode 100644 index 87f2241..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverClientService.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.grpc; - -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.stub.StreamObserver; -import org.apache.reef.bridge.client.DriverClientDispatcher; -import org.apache.reef.bridge.client.IDriverClientService; -import org.apache.reef.bridge.client.JVMClientProcess; -import org.apache.reef.bridge.client.events.*; -import org.apache.reef.bridge.proto.*; -import org.apache.reef.bridge.proto.Void; -import org.apache.reef.bridge.service.DriverClientException; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.EvaluatorDescriptor; -import org.apache.reef.driver.task.FailedTask; -import org.apache.reef.exception.EvaluatorException; -import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl; -import org.apache.reef.tang.InjectionFuture; -import org.apache.reef.util.Optional; -import org.apache.reef.wake.remote.ports.TcpPortProvider; -import org.apache.reef.wake.time.Clock; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; - -import javax.inject.Inject; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * The driver client service that accepts incoming messages driver service and - * dispatches appropriate objects to the application. - */ -public final class DriverClientService extends DriverClientGrpc.DriverClientImplBase - implements IDriverClientService { - - private static final Logger LOG = Logger.getLogger(DriverClientService.class.getName()); - - private Server server; - - private final InjectionFuture<Clock> clock; - - private final DriverServiceClient driverServiceClient; - - private final TcpPortProvider tcpPortProvider; - - private final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher; - - private final Map<String, AllocatedEvaluatorBridge> evaluatorBridgeMap = new HashMap<>(); - - private final Map<String, ActiveContextBridge> activeContextBridgeMap = new HashMap<>(); - - @Inject - private DriverClientService( - final DriverServiceClient driverServiceClient, - final TcpPortProvider tcpPortProvider, - final InjectionFuture<Clock> clock, - final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher) { - this.driverServiceClient = driverServiceClient; - this.tcpPortProvider = tcpPortProvider; - this.clock = clock; - this.clientDriverDispatcher = clientDriverDispatcher; - } - - @Override - public void start() throws IOException { - for (final Integer port : this.tcpPortProvider) { - try { - this.server = ServerBuilder.forPort(port) - .addService(this) - .build() - .start(); - LOG.info("Driver Client Server started, listening on " + port); - break; - } catch (IOException e) { - LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); - } - } - if (this.server == null || this.server.isTerminated()) { - throw new IOException("Unable to start gRPC server"); - } - this.driverServiceClient.registerDriverClientService("localhost", this.server.getPort()); - } - - @Override - public void awaitTermination() throws InterruptedException { - if (this.server != null) { - this.server.awaitTermination(); - } - } - - @Override - public void idlenessCheckHandler(final Void request, final StreamObserver<IdleStatus> responseObserver) { - responseObserver.onNext(IdleStatus.newBuilder() - .setReason("DriverClient checking idleness") - .setIsIdle( - !this.clock.get().isIdle() && - this.activeContextBridgeMap.isEmpty() && - this.evaluatorBridgeMap.isEmpty()) - .build()); - responseObserver.onCompleted(); - } - - @Override - public void startHandler(final StartTimeInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "StartHandler at time {0}", request.getStartTime()); - final StartTime startTime = new StartTime(request.getStartTime()); - this.clientDriverDispatcher.get().dispatch(startTime); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void stopHandler(final StopTimeInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "StopHandler at time {0}", request.getStopTime()); - final StopTime stopTime = new StopTime(request.getStopTime()); - this.clientDriverDispatcher.get().dispatch(stopTime); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void alarmTrigger(final AlarmTriggerInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "Alarm Trigger id {0}", request.getAlarmId()); - this.clientDriverDispatcher.get().dispatchAlarm(request.getAlarmId()); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void allocatedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "Allocated evaluator id {0}", request.getEvaluatorId()); - final AllocatedEvaluatorBridge eval = new AllocatedEvaluatorBridge( - request.getEvaluatorId(), - toEvaluatorDescriptor(request.getDescriptorInfo()), - this.driverServiceClient); - this.evaluatorBridgeMap.put(eval.getId(), eval); - this.clientDriverDispatcher.get().dispatch(eval); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void completedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "Completed Evaluator id {0}", request.getEvaluatorId()); - this.evaluatorBridgeMap.remove(request.getEvaluatorId()); - this.clientDriverDispatcher.get().dispatch(new CompletedEvaluatorBridge(request.getEvaluatorId())); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void failedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "Failed Evaluator id {0}", request.getEvaluatorId()); - final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.remove(request.getEvaluatorId()); - List<FailedContext> failedContextList = new ArrayList<>(); - if (request.getFailure().getFailedContextsList() != null) { - for (final String failedContextId : request.getFailure().getFailedContextsList()) { - final ActiveContextBridge context = this.activeContextBridgeMap.get(failedContextId); - failedContextList.add(new FailedContextBridge( - context.getId(), - eval.getId(), - request.getFailure().getMessage(), - eval.getEvaluatorDescriptor(), - context.getParentId().isPresent() ? - Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : - Optional.<ActiveContext>empty(), - Optional.<byte[]>empty())); - } - for (final String failedContextId : request.getFailure().getFailedContextsList()) { - this.activeContextBridgeMap.remove(failedContextId); - } - } - this.clientDriverDispatcher.get().dispatch( - new FailedEvaluatorBridge( - eval.getId(), - new EvaluatorException(request.getEvaluatorId(), request.getFailure().getMessage()), - failedContextList, - request.getFailure().getFailedTaskId() != null ? - Optional.of(new FailedTask( - request.getFailure().getFailedTaskId(), - request.getFailure().getMessage(), - Optional.<String>empty(), - Optional.<Throwable>empty(), - Optional.<byte[]>empty(), - Optional.<ActiveContext>empty())) : - Optional.<FailedTask>empty())); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void activeContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "Active context id {0}", request.getContextId()); - final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.get(request.getEvaluatorId()); - final ActiveContextBridge context = new ActiveContextBridge( - this.driverServiceClient, - request.getContextId(), - request.getParentId() != null ? Optional.of(request.getParentId()) : Optional.<String>empty(), - eval.getId(), - eval.getEvaluatorDescriptor()); - this.activeContextBridgeMap.put(context.getId(), context); - this.clientDriverDispatcher.get().dispatch(context); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void closedContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { - if (this.activeContextBridgeMap.containsKey(request.getContextId())) { - LOG.log(Level.INFO, "Closed context id {0}", request.getContextId()); - try { - final ActiveContextBridge context = this.activeContextBridgeMap.remove(request.getContextId()); - this.clientDriverDispatcher.get().dispatch( - new ClosedContextBridge( - context.getId(), - context.getEvaluatorId(), - this.activeContextBridgeMap.get(request.getParentId()), - context.getEvaluatorDescriptor())); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } else { - responseObserver.onError( - new DriverClientException("Unknown context id " + request.getContextId() + " in close")); - } - } - - @Override - public void failedContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { - if (this.activeContextBridgeMap.containsKey(request.getContextId())) { - LOG.log(Level.INFO, "Failed context id {0}", request.getContextId()); - try { - final ActiveContextBridge context = this.activeContextBridgeMap.remove(request.getContextId()); - final Optional<ActiveContext> parent = context.getParentId().isPresent() ? - Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : - Optional.<ActiveContext>empty(); - final Optional<byte[]> data = request.getException().getData() != null ? - Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); - this.clientDriverDispatcher.get().dispatch( - new FailedContextBridge( - context.getId(), - context.getEvaluatorId(), - request.getException().getMessage(), - context.getEvaluatorDescriptor(), - parent, - data)); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } else { - responseObserver.onError( - new DriverClientException("Unknown context id " + request.getContextId() + " in close")); - } - } - - @Override - public void contextMessageHandler(final ContextMessageInfo request, final StreamObserver<Void> responseObserver) { - if (this.activeContextBridgeMap.containsKey(request.getContextId())) { - LOG.log(Level.INFO, "Message context id {0}", request.getContextId()); - try { - this.clientDriverDispatcher.get().dispatch( - new ContextMessageBridge( - request.getContextId(), - request.getMessageSourceId(), - request.getSequenceNumber(), - request.getPayload().toByteArray())); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } else { - responseObserver.onError( - new DriverClientException("Unknown context id " + request.getContextId() + " in close")); - } - } - - @Override - public void runningTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { - if (this.activeContextBridgeMap.containsKey(request.getContextId())) { - LOG.log(Level.INFO, "Running task id {0}", request.getTaskId()); - try { - final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContextId()); - this.clientDriverDispatcher.get().dispatch( - new RunningTaskBridge(this.driverServiceClient, request.getTaskId(), context)); - } finally { - responseObserver.onCompleted(); - } - } else { - responseObserver.onError( - new DriverClientException("Unknown context id: " + request.getContextId())); - } - } - - @Override - public void failedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { - try { - LOG.log(Level.INFO, "Failed task id {0}", request.getTaskId()); - final Optional<ActiveContext> context = this.activeContextBridgeMap.containsKey(request.getContextId()) ? - Optional.<ActiveContext>of(this.activeContextBridgeMap.get(request.getContextId())) : - Optional.<ActiveContext>empty(); - final Optional<byte[]> data = request.getException().getData() != null ? - Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); - this.clientDriverDispatcher.get().dispatch( - new FailedTask( - request.getTaskId(), - request.getException().getMessage(), - Optional.of(request.getException().getName()), - Optional.<Throwable>of(new EvaluatorException(request.getException().getMessage())), - data, - context)); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void completedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { - if (this.activeContextBridgeMap.containsKey(request.getContextId())) { - LOG.log(Level.INFO, "Completed task id {0}", request.getTaskId()); - try { - final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContextId()); - this.clientDriverDispatcher.get().dispatch( - new CompletedTaskBridge( - request.getTaskId(), - context, - request.getResult() != null ? request.getResult().toByteArray() : null)); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } else { - responseObserver.onError( - new DriverClientException("Unknown context id: " + request.getContextId())); - } - } - - @Override - public void suspendedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { - responseObserver.onError(new DriverClientException("Not supported")); - } - - @Override - public void taskMessageHandler(final TaskMessageInfo request, final StreamObserver<Void> responseObserver) { - if (this.activeContextBridgeMap.containsKey(request.getContextId())) { - LOG.log(Level.INFO, "Message task id {0}", request.getTaskId()); - try { - this.clientDriverDispatcher.get().dispatch( - new TaskMessageBridge( - request.getTaskId(), - request.getContextId(), - request.getMessageSourceId(), - request.getSequenceNumber(), - request.getPayload().toByteArray())); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } else { - responseObserver.onError( - new DriverClientException("Unknown context id: " + request.getContextId())); - } - } - - @Override - public void clientMessageHandler(final ClientMessageInfo request, final StreamObserver<Void> responseObserver) { - LOG.log(Level.INFO, "Client message"); - try { - this.clientDriverDispatcher.get().clientMessageDispatch(request.getPayload().toByteArray()); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void clientCloseHandler(final Void request, final StreamObserver<Void> responseObserver) { - LOG.log(Level.INFO, "Client close"); - try { - this.clientDriverDispatcher.get().clientCloseDispatch(); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - @Override - public void clientCloseWithMessageHandler( - final ClientMessageInfo request, - final StreamObserver<Void> responseObserver) { - LOG.log(Level.INFO, "Client close with message"); - try { - this.clientDriverDispatcher.get().clientCloseWithMessageDispatch(request.getPayload().toByteArray()); - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); - } - } - - // Helper methods - - private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescriptorInfo info) { - return new EvaluatorDescriptorImpl( - null, - info.getMemory(), - info.getCores(), - new JVMClientProcess(), - info.getRuntimeName()); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java deleted file mode 100644 index 0bc29ce..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/DriverServiceClient.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.grpc; - -import com.google.protobuf.ByteString; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.bridge.client.IDriverServiceClient; -import org.apache.reef.bridge.client.JVMClientProcess; -import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort; -import org.apache.reef.bridge.proto.*; -import org.apache.reef.driver.evaluator.EvaluatorRequest; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.tang.formats.ConfigurationSerializer; -import org.apache.reef.util.Optional; - -import javax.inject.Inject; -import java.io.File; -import java.util.List; - -/** - * The client that exposes methods for communicating back to the - * driver service. - */ -@Private -public final class DriverServiceClient implements IDriverServiceClient { - - /** Used for the evaluator configuration, which is not needed in Java. */ - private static final Configuration EMPTY_CONF = - Tang.Factory.getTang().newConfigurationBuilder().build(); - - private final ConfigurationSerializer configurationSerializer; - - private final DriverServiceGrpc.DriverServiceBlockingStub serviceStub; - - @Inject - private DriverServiceClient( - final ConfigurationSerializer configurationSerializer, - @Parameter(DriverServicePort.class) final Integer driverServicePort) { - this.configurationSerializer = configurationSerializer; - final ManagedChannel channel = ManagedChannelBuilder - .forAddress("localhost", driverServicePort) - .usePlaintext(true) - .build(); - this.serviceStub = DriverServiceGrpc.newBlockingStub(channel); - } - - public void registerDriverClientService(final String host, final int port) { - this.serviceStub.registerDriverClient( - DriverClientRegistration.newBuilder() - .setHost(host) - .setPort(port) - .build()); - } - - @Override - public void onShutdown() { - this.serviceStub.shutdown(ShutdownRequest.newBuilder().build()); - } - - @Override - public void onShutdown(final Throwable ex) { - this.serviceStub.shutdown(ShutdownRequest.newBuilder() - .setException(ExceptionInfo.newBuilder() - .setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString()) - .setMessage(ex.getMessage()) - .build()) - .build()); - } - - @Override - public void onSetAlarm(final String alarmId, final int timeoutMS) { - this.serviceStub.setAlarm( - AlarmRequest.newBuilder() - .setAlarmId(alarmId) - .setTimeoutMs(timeoutMS) - .build()); - } - - @Override - public void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest) { - this.serviceStub.requestResources( - ResourceRequest.newBuilder() - .setCores(evaluatorRequest.getNumberOfCores()) - .setMemorySize(evaluatorRequest.getMegaBytes()) - .setRelaxLocality(evaluatorRequest.getRelaxLocality()) - .setResourceCount(evaluatorRequest.getNumber()) - .setRuntimeName(evaluatorRequest.getRuntimeName()) - .addAllRackNameList(evaluatorRequest.getRackNames()) - .addAllNodeNameList(evaluatorRequest.getNodeNames()) - .build()); - } - - @Override - public void onEvaluatorClose(final String evalautorId) { - this.serviceStub.allocatedEvaluatorOp( - AllocatedEvaluatorRequest.newBuilder() - .setEvaluatorId(evalautorId) - .setCloseEvaluator(true) - .build()); - } - - @Override - public void onEvaluatorSubmit( - final String evaluatorId, - final Optional<Configuration> contextConfiguration, - final Optional<Configuration> taskConfiguration, - final Optional<JVMClientProcess> evaluatorProcess, - final Optional<List<File>> addFileList, - final Optional<List<File>> addLibraryList) { - final AllocatedEvaluatorRequest.Builder builder = - AllocatedEvaluatorRequest.newBuilder().setEvaluatorId(evaluatorId); - if (addFileList.isPresent()) { - for (final File file : addFileList.get()) { - builder.addAddFiles(file.getAbsolutePath()); - } - } - if (addLibraryList.isPresent()) { - for (final File file : addLibraryList.get()) { - builder.addAddLibraries(file.getAbsolutePath()); - } - } - if (evaluatorProcess.isPresent()) { - final JVMClientProcess rawEP = evaluatorProcess.get(); - builder.setSetProcess( - AllocatedEvaluatorRequest.EvaluatorProcessRequest.newBuilder() - .setConfigurationFileName(rawEP.getConfigurationFileName()) - .setMemoryMb(rawEP.getMemory()) - .setStandardOut(rawEP.getStandardOut()) - .setStandardErr(rawEP.getStandardErr()) - .addAllOptions(rawEP.getOptions()) - .build()); - } - if (contextConfiguration.isPresent()) { - builder.setContextConfiguration( - this.configurationSerializer.toString(contextConfiguration.get())); - } - if (taskConfiguration.isPresent()) { - builder.setTaskConfiguration( - this.configurationSerializer.toString(taskConfiguration.get())); - } - this.serviceStub.allocatedEvaluatorOp(builder.build()); - } - - // Context Operations - - @Override - public void onContextClose(final String contextId) { - this.serviceStub.activeContextOp( - ActiveContextRequest.newBuilder() - .setContextId(contextId) - .setCloseContext(true) - .build()); - } - - @Override - public void onContextSubmitContext( - final String contextId, - final Configuration contextConfiguration) { - this.serviceStub.activeContextOp( - ActiveContextRequest.newBuilder() - .setContextId(contextId) - .setNewContextRequest(this.configurationSerializer.toString(contextConfiguration)) - .build()); - } - - @Override - public void onContextSubmitTask( - final String contextId, - final Configuration taskConfiguration) { - this.serviceStub.activeContextOp( - ActiveContextRequest.newBuilder() - .setContextId(contextId) - .setNewTaskRequest(this.configurationSerializer.toString(taskConfiguration)) - .build()); - } - - @Override - public void onContextMessage(final String contextId, final byte[] message) { - this.serviceStub.activeContextOp( - ActiveContextRequest.newBuilder() - .setContextId(contextId) - .setMessage(ByteString.copyFrom(message)) - .build()); - } - - // Task operations - - @Override - public void onTaskClose(final String taskId, final Optional<byte[]> message) { - this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() - .setTaskId(taskId) - .setCloseTask(true) - .setMessage(message.isPresent() ? ByteString.copyFrom(message.get()) : null) - .build()); - } - - @Override - public void onTaskMessage(final String taskId, final byte[] message) { - this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() - .setTaskId(taskId) - .setMessage(ByteString.copyFrom(message)) - .build()); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java deleted file mode 100644 index 63518d0..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * gRPC specific implementations of the driver client bridge. - */ -package org.apache.reef.bridge.client.grpc; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java deleted file mode 100644 index f8ac2d6..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/DriverServicePort.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.bridge.client.grpc.parameters; - -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; - -/** - * gRPC driver service port. - */ -@NamedParameter(doc = "Driver Service Grpc port", short_name = "driver-service-port") -public final class DriverServicePort implements Name<Integer> { -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java deleted file mode 100644 index 750eda0..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/grpc/parameters/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * gRPC specific parameters. - */ -package org.apache.reef.bridge.client.grpc.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java new file mode 100644 index 0000000..9550a0c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.launch; + +import org.apache.reef.bridge.client.IDriverRuntimeConfigurationProvider; +import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; +import org.apache.reef.bridge.client.IDriverServiceRuntimeLauncher; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.tang.exceptions.InjectionException; + +import javax.inject.Inject; + +/** + * Local driver service launcher. + */ +public final class LocalDriverServiceRuntimeLauncher implements IDriverServiceRuntimeLauncher { + + private final IDriverRuntimeConfigurationProvider driverRuntimeConfigurationProvider; + + private final IDriverServiceConfigurationProvider driverServiceConfigurationProvider; + @Inject + private LocalDriverServiceRuntimeLauncher( + final IDriverRuntimeConfigurationProvider driverRuntimeConfigurationProvider, + final IDriverServiceConfigurationProvider driverServiceConfigurationProvider) { + this.driverRuntimeConfigurationProvider = driverRuntimeConfigurationProvider; + this.driverServiceConfigurationProvider = driverServiceConfigurationProvider; + } + + @Override + public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + try { + DriverLauncher.getLauncher( + driverRuntimeConfigurationProvider.getConfiguration(driverClientConfiguration)) + .run(driverServiceConfigurationProvider.getConfiguration(driverClientConfiguration)); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java new file mode 100644 index 0000000..a2f4039 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.launch; + +import org.apache.reef.bridge.client.IDriverRuntimeConfigurationProvider; +import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; +import org.apache.reef.bridge.client.IDriverServiceRuntimeLauncher; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.tang.exceptions.InjectionException; + +import javax.inject.Inject; + +/** + * Yarn driver service launcher. + */ +public final class YarnDriverServiceRuntimeLauncher implements IDriverServiceRuntimeLauncher { + + private final IDriverRuntimeConfigurationProvider driverRuntimeConfigurationProvider; + + private final IDriverServiceConfigurationProvider driverServiceConfigurationProvider; + + @Inject + private YarnDriverServiceRuntimeLauncher( + final IDriverRuntimeConfigurationProvider driverRuntimeConfigurationProvider, + final IDriverServiceConfigurationProvider driverServiceConfigurationProvider) { + this.driverRuntimeConfigurationProvider = driverRuntimeConfigurationProvider; + this.driverServiceConfigurationProvider = driverServiceConfigurationProvider; + } + + @Override + public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + try { + DriverLauncher.getLauncher( + driverRuntimeConfigurationProvider.getConfiguration(driverClientConfiguration)) + .run(driverServiceConfigurationProvider.getConfiguration(driverClientConfiguration)); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/package-info.java new file mode 100644 index 0000000..bc89ef7 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Client bridge runtime specific launchers. + */ +package org.apache.reef.bridge.client.launch; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java index 0da2369..b0793e3 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/package-info.java @@ -17,6 +17,6 @@ * under the License. */ /** - * Java bridge client driver. + * Client bridge. */ package org.apache.reef.bridge.client; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java deleted file mode 100644 index 9d16c80..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/ClientDriverStopHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.parameters; - -import org.apache.reef.bridge.client.DefaultDriverClientStopHandler; -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.event.StopTime; - -import java.util.Set; - -/** - * Client driver stop handler. - */ -@NamedParameter(doc ="Java driver client stop handler", - default_class = DefaultDriverClientStopHandler.class) -public final class ClientDriverStopHandler implements Name<Set<EventHandler<StopTime>>> { -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java deleted file mode 100644 index b0adf1c..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/DriverClientDispatchThreadCount.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.client.parameters; - -import org.apache.reef.tang.annotations.Name; -import org.apache.reef.tang.annotations.NamedParameter; - -/** - * Driver client dispatcher thread count. - */ -@NamedParameter(doc = "Number of dispatch threads", default_value = "1") -public class DriverClientDispatchThreadCount implements Name<Integer> { -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java deleted file mode 100644 index b39fce9..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/parameters/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Java bridge driver client specific parameters. - */ -package org.apache.reef.bridge.client.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/LocalDriverRuntimeConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/LocalDriverRuntimeConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/LocalDriverRuntimeConfigurationProvider.java new file mode 100644 index 0000000..0f8e3a8 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/LocalDriverRuntimeConfigurationProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.runtime; + +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.client.IDriverRuntimeConfigurationProvider; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.formats.ConfigurationModule; + +import javax.inject.Inject; + +/** + * Local driver runtime configuration provider for the bridge. + */ +public final class LocalDriverRuntimeConfigurationProvider implements IDriverRuntimeConfigurationProvider { + + @Inject + LocalDriverRuntimeConfigurationProvider() { + } + + @Override + public Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration) { + ConfigurationModule localRuntimeCM = LocalRuntimeConfiguration.CONF; + if (driverConfiguration.getLocalRuntime().getMaxNumberOfEvaluators() > 0) { + localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, + driverConfiguration.getLocalRuntime().getMaxNumberOfEvaluators()); + } + if (StringUtils.isNotEmpty(driverConfiguration.getLocalRuntime().getRuntimeRootFolder())) { + localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, + driverConfiguration.getLocalRuntime().getRuntimeRootFolder()); + } + if (driverConfiguration.getLocalRuntime().getJvmHeapSlack() > 0.0) { + localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.JVM_HEAP_SLACK, + driverConfiguration.getLocalRuntime().getJvmHeapSlack()); + } + return localRuntimeCM.build(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java new file mode 100644 index 0000000..ad7bd7d --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.runtime; + +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.client.IDriverRuntimeConfigurationProvider; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier; +import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; +import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.formats.ConfigurationModule; + +import javax.inject.Inject; + +/** + * Yarn driver runtime configuration provider for the bridge. + */ +public final class YarnDriverRuntimeConfigurationProvider implements IDriverRuntimeConfigurationProvider { + + @Inject + YarnDriverRuntimeConfigurationProvider() { + } + + @Override + public Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration) { + ConfigurationModule yarnDriverConfiguration = YarnDriverConfiguration.CONF + .set(YarnDriverConfiguration.JOB_IDENTIFIER, driverConfiguration.getJobid()) + .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE) + .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0) + .set(YarnDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME); + if (StringUtils.isNotEmpty(driverConfiguration.getYarnRuntime().getJobSubmissionDirectory())) { + yarnDriverConfiguration = yarnDriverConfiguration + .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, + driverConfiguration.getYarnRuntime().getJobSubmissionDirectory()); + } + if (StringUtils.isNotEmpty(driverConfiguration.getYarnRuntime().getFilesystemUrl())) { + JavaConfigurationBuilder providerConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(FileSystemUrl.class, driverConfiguration.getYarnRuntime().getFilesystemUrl()); + return Configurations.merge(yarnDriverConfiguration.build(), providerConfig.build()); + } else { + return yarnDriverConfiguration.build(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/package-info.java new file mode 100644 index 0000000..24dc42c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Client bridge. + */ +package org.apache.reef.bridge.client.runtime;
