Repository: incubator-reef Updated Branches: refs/heads/master 26156627e -> 15dd78698
[REEF-393]: OutputService for writing files to filesystem This addressed the issue by * implementing OutputService. * implementing an example application using OutputService. JIRA: [REEF-393](https://issues.apache.org/jira/browse/REEF-393) Pull Request: This closes #242 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/15dd7869 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/15dd7869 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/15dd7869 Branch: refs/heads/master Commit: 15dd786988d9cfb5d456a0c67fb3d92ab9e02661 Parents: 2615662 Author: Kijung Shin <[email protected]> Authored: Thu Jun 25 17:02:51 2015 +0900 Committer: Markus Weimer <[email protected]> Committed: Thu Jul 2 12:45:50 2015 -0700 ---------------------------------------------------------------------- .../data/output/OutputServiceDriver.java | 122 +++++++++++++ .../examples/data/output/OutputServiceREEF.java | 171 +++++++++++++++++++ .../examples/data/output/OutputServiceTask.java | 64 +++++++ .../reef/examples/data/output/package-info.java | 24 +++ .../reef/io/data/output/OutputService.java | 34 ++++ .../io/data/output/OutputStreamProvider.java | 45 +++++ .../reef/io/data/output/TaskOutputService.java | 126 ++++++++++++++ .../data/output/TaskOutputServiceBuilder.java | 46 +++++ .../data/output/TaskOutputStreamProvider.java | 48 ++++++ .../output/TaskOutputStreamProviderHDFS.java | 83 +++++++++ .../output/TaskOutputStreamProviderLocal.java | 78 +++++++++ .../reef/io/data/output/package-info.java | 26 +++ 12 files changed, 867 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java new file mode 100644 index 0000000..50fb86c --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceDriver.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.data.output; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ContextConfiguration; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.task.TaskConfiguration; +import org.apache.reef.io.data.output.OutputService; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; + +import javax.inject.Inject; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The Driver code for the output service demo app. + */ +@Unit +public final class OutputServiceDriver { + private static final Logger LOG = Logger.getLogger(OutputServiceDriver.class.getName()); + + /** + * Evaluator requestor object used to create new evaluator containers. + */ + private final EvaluatorRequestor requestor; + + /** + * Output service object. + */ + private final OutputService outputService; + + /** + * Sub-id for Tasks. + * This object grants different IDs to each task + * e.g. Task-0, Task-1, and so on. + */ + private final AtomicInteger taskId = new AtomicInteger(0); + + /** + * Job driver constructor - instantiated via TANG. + * + * @param requestor evaluator requestor object used to create new evaluator containers. + * @param outputService output service object. + */ + @Inject + public OutputServiceDriver(final EvaluatorRequestor requestor, + final OutputService outputService) { + LOG.log(Level.FINE, "Instantiated 'OutputServiceDriver'"); + this.requestor = requestor; + this.outputService = outputService; + } + + /** + * Handles the StartTime event: Request three Evaluators. + */ + public final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + OutputServiceDriver.this.requestor.submit(EvaluatorRequest.newBuilder() + .setNumber(3) + .setMemory(64) + .setNumberOfCores(1) + .build()); + LOG.log(Level.INFO, "Requested Evaluator."); + } + } + + /** + * Handles AllocatedEvaluator: Submit the output service and a context for it. + */ + public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator allocatedEvaluator) { + LOG.log(Level.INFO, "Submitting Output Service to AllocatedEvaluator: {0}", allocatedEvaluator); + final Configuration contextConfiguration = ContextConfiguration.CONF + .set(ContextConfiguration.IDENTIFIER, "OutputServiceContext") + .build(); + allocatedEvaluator.submitContextAndService( + contextConfiguration, outputService.getServiceConfiguration()); + } + } + + /** + * Handles ActiveContext: Submit the output service demo task. + */ + public final class ActiveContextHandler implements EventHandler<ActiveContext> { + @Override + public void onNext(final ActiveContext activeContext) { + LOG.log(Level.INFO, + "Submitting OutputServiceREEF task to AllocatedEvaluator: {0}", + activeContext.getEvaluatorDescriptor()); + final Configuration taskConfiguration = TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, "Task-" + taskId.getAndIncrement()) + .set(TaskConfiguration.TASK, OutputServiceTask.class) + .build(); + activeContext.submitTask(taskConfiguration); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java new file mode 100644 index 0000000..139a8d2 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceREEF.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.data.output; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.io.data.output.*; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.CommandLine; +import org.apache.reef.util.EnvironmentUtils; + +import java.io.File; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Client for the output service demo app. + */ +@ClientSide +public final class OutputServiceREEF { + private static final Logger LOG = Logger.getLogger(OutputServiceREEF.class.getName()); + + public static void main(final String[] args) + throws InjectionException, BindException, IOException { + + final Tang tang = Tang.Factory.getTang(); + final JavaConfigurationBuilder cb = tang.newConfigurationBuilder(); + new CommandLine(cb) + .registerShortNameOfClass(Local.class) + .registerShortNameOfClass(TimeOut.class) + .registerShortNameOfClass(OutputDir.class) + .processCommandLine(args); + + final Injector injector = tang.newInjector(cb.build()); + final boolean isLocal = injector.getNamedInstance(Local.class); + final String outputDir = injector.getNamedInstance(OutputDir.class); + final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000; + + final Configuration driverConf = getDriverConf(); + final Configuration outputServiceConf = getOutputServiceConf(isLocal, outputDir); + final Configuration submittedConfiguration = Tang.Factory.getTang() + .newConfigurationBuilder(driverConf, outputServiceConf) + .build(); + final LauncherStatus state = DriverLauncher.getLauncher(getRuntimeConf(isLocal)) + .run(submittedConfiguration, jobTimeout); + + LOG.log(Level.INFO, "REEF job completed: {0}", state); + } + + /** + * @param isLocal true for local runtime, or false for YARN runtime. + * @return The runtime configuration + */ + private static Configuration getRuntimeConf(final boolean isLocal) { + final Configuration runtimeConf; + if (isLocal) { + LOG.log(Level.INFO, "Running the output service demo on the local runtime"); + runtimeConf = LocalRuntimeConfiguration.CONF + .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 3) + .build(); + } else { + LOG.log(Level.INFO, "Running the output service demo on YARN"); + runtimeConf = YarnClientConfiguration.CONF.build(); + } + return runtimeConf; + } + + /** + * @return The Driver configuration. + */ + private static Configuration getDriverConf() { + final Configuration driverConf = DriverConfiguration.CONF + .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(OutputServiceDriver.class)) + .set(DriverConfiguration.DRIVER_IDENTIFIER, "OutputServiceREEF") + .set(DriverConfiguration.ON_DRIVER_STARTED, OutputServiceDriver.StartHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, OutputServiceDriver.EvaluatorAllocatedHandler.class) + .set(DriverConfiguration.ON_CONTEXT_ACTIVE, OutputServiceDriver.ActiveContextHandler.class) + .build(); + + return driverConf; + } + + /** + * @param isLocal true for local runtime, or false for YARN runtime. + * @param outputDir path of the output directory. + * @return The configuration to use OutputService + */ + private static Configuration getOutputServiceConf(final boolean isLocal, final String outputDir) { + final Configuration outputServiceConf; + if (isLocal) { + outputServiceConf = TaskOutputServiceBuilder.CONF + .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderLocal.class) + .set(TaskOutputServiceBuilder.OUTPUT_PATH, getAbsolutePath(outputDir)) + .build(); + } else { + outputServiceConf = TaskOutputServiceBuilder.CONF + .set(TaskOutputServiceBuilder.TASK_OUTPUT_STREAM_PROVIDER, TaskOutputStreamProviderHDFS.class) + .set(TaskOutputServiceBuilder.OUTPUT_PATH, outputDir) + .build(); + } + return outputServiceConf; + } + + /** + * transform the given relative path into the absolute path based on the current directory where a user runs the demo. + * @param relativePath relative path + * @return absolute path + */ + private static String getAbsolutePath(final String relativePath) { + final File outputFile = new File(relativePath); + return outputFile.getAbsolutePath(); + } + + /** + * Command line parameter = true to run locally, or false to run on YARN. + */ + @NamedParameter(doc = "Whether or not to run on the local runtime", + short_name = "local", default_value = "true") + public static final class Local implements Name<Boolean> { + } + + /** + * Command line parameter = number of minutes before timeout. + */ + @NamedParameter(doc = "Number of minutes before timeout", + short_name = "timeout", default_value = "2") + public static final class TimeOut implements Name<Integer> { + } + + /** + * Command line parameter = path of the output directory. + */ + @NamedParameter(doc = "Path of the output directory", + short_name = "output") + public static final class OutputDir implements Name<String> { + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private OutputServiceREEF() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java new file mode 100644 index 0000000..b9ae0ea --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/OutputServiceTask.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.data.output; + +import org.apache.reef.io.data.output.OutputStreamProvider; +import org.apache.reef.task.Task; + +import javax.inject.Inject; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * The Task code for the output service demo app. + * This task receives an output stream from the output service + * and writes "Hello REEF!" on it. + */ +public final class OutputServiceTask implements Task { + + /** + * Output stream provider object through which tasks create output streams. + */ + private final OutputStreamProvider outputStreamProvider; + + /** + * Task constructor - instantiated via TANG. + * + * @param outputStreamProvider Output stream provider object through which tasks create output streams. + */ + @Inject + public OutputServiceTask(final OutputStreamProvider outputStreamProvider) { + this.outputStreamProvider = outputStreamProvider; + } + + /** + * Receives an output stream from the output service and writes "Hello REEF!" on it. + * + * @param memento the memento objected passed down by the driver. + * @return null + * @throws java.io.IOException + */ + @Override + public byte[] call(final byte[] memento) throws IOException { + try (final DataOutputStream outputStream = outputStreamProvider.create("hello")) { + outputStream.writeBytes("Hello REEF!"); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java new file mode 100644 index 0000000..f9c7371 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/data/output/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Example application using the output service. + * Three evaluators are allocated, and three tasks running on them + * write outputs using the output service. + */ +package org.apache.reef.examples.data.output; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java new file mode 100644 index 0000000..c81690d --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputService.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.data.output; + +import org.apache.reef.tang.Configuration; + +/** + * All output services should implement this interface. + */ +public interface OutputService { + + /** + * Provides a service configuration for the output service. + * + * @return service configuration. + */ + Configuration getServiceConfiguration(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java new file mode 100644 index 0000000..62b3283 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/OutputStreamProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.data.output; + +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * A provider through which users create output streams. + */ +public interface OutputStreamProvider { + + /** + * create an output stream using the given name. + * + * @param name name of the created output stream + * It is used as the name of the directory if the created output stream is a file output stream + * @return created output stream + * @throws java.io.IOException + */ + DataOutputStream create(final String name) throws IOException; + + /** + * release resources. + * + * @throws java.io.IOException + */ + void close() throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java new file mode 100644 index 0000000..6388f77 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputService.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.data.output; + +import org.apache.reef.driver.context.ServiceConfiguration; +import org.apache.reef.evaluator.context.events.ContextStop; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.task.events.TaskStart; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A service class of the task output service. + * The file output service provides an output stream, + * through which tasks write their output to a file + * without considering the current runtime + * and collision with other tasks. + */ +@Unit +public final class TaskOutputService implements OutputService { + private static final Logger LOG = Logger.getLogger(TaskOutputService.class.getName()); + + /** + * Output stream provider object through which tasks create output streams. + */ + private final TaskOutputStreamProvider taskOutputStreamProvider; + + /** + * Path of the directory where output files are created. + */ + private final String outputPath; + + /** + * Service constructor - instantiated via TANG. + * + * @param taskOutputStreamProvider Output stream provider object through which tasks create file output streams. + * @param outputPath Path of the directory where output files are created. + */ + @Inject + private TaskOutputService( + final TaskOutputStreamProvider taskOutputStreamProvider, + @Parameter(OutputPath.class) final String outputPath) { + this.taskOutputStreamProvider = taskOutputStreamProvider; + this.outputPath = outputPath; + } + + /** + * Provides a service configuration for the output service. + * + * @return service configuration. + */ + @Override + public Configuration getServiceConfiguration() { + + final Configuration partialServiceConf = ServiceConfiguration.CONF + .set(ServiceConfiguration.SERVICES, taskOutputStreamProvider.getClass()) + .set(ServiceConfiguration.ON_CONTEXT_STOP, ContextStopHandler.class) + .set(ServiceConfiguration.ON_TASK_STARTED, TaskStartHandler.class) + .build(); + + return Tang.Factory.getTang() + .newConfigurationBuilder(partialServiceConf) + .bindImplementation(OutputStreamProvider.class, taskOutputStreamProvider.getClass()) + .bindImplementation(TaskOutputStreamProvider.class, taskOutputStreamProvider.getClass()) + .bindNamedParameter(OutputPath.class, outputPath) + .build(); + } + + /** + * Handles the ContextStop event: Close the output stream provider. + */ + private final class ContextStopHandler implements EventHandler<ContextStop> { + @Override + public void onNext(final ContextStop contextStop) { + LOG.log(Level.INFO, "Context stopped, close the OutputStreamProvider."); + try { + taskOutputStreamProvider.close(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Handles the TaskStart event: Set the task id to the output stream provider. + */ + private final class TaskStartHandler implements EventHandler<TaskStart> { + @Override + public void onNext(final TaskStart taskStart) { + LOG.log(Level.INFO, String.format("Task %s started, create the OutputStreamProvider.", taskStart.getId())); + taskOutputStreamProvider.setTaskId(taskStart.getId()); + } + } + + /** + * Path of the directory where output files are created. + */ + @NamedParameter(doc = "Path of the directory where output files are created") + public static final class OutputPath implements Name<String> { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java new file mode 100644 index 0000000..d423e5b --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputServiceBuilder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.data.output; + +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.RequiredImpl; +import org.apache.reef.tang.formats.RequiredParameter; + +/** + * Builder to create a TaskOutputService object. + */ +public final class TaskOutputServiceBuilder extends ConfigurationModuleBuilder { + + /** + * A provider through which users create task output streams. + */ + public static final RequiredImpl<TaskOutputStreamProvider> TASK_OUTPUT_STREAM_PROVIDER = new RequiredImpl<>(); + + /** + * Path of the directory where output files are created. + */ + public static final RequiredParameter<String> OUTPUT_PATH = new RequiredParameter<>(); + + public static final ConfigurationModule CONF = new TaskOutputServiceBuilder() + .bindImplementation(OutputService.class, TaskOutputService.class) + .bindImplementation(TaskOutputStreamProvider.class, TASK_OUTPUT_STREAM_PROVIDER) + .bindNamedParameter(TaskOutputService.OutputPath.class, OUTPUT_PATH) + .build(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java new file mode 100644 index 0000000..f3b9d0b --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProvider.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.data.output; + +/** + * A provider through which users create task output streams. + */ +public abstract class TaskOutputStreamProvider implements OutputStreamProvider { + + /** + * id of the current task. + */ + private String taskId; + + /** + * set the id of the current task. + * + * @param taskId id of the current task + */ + protected final void setTaskId(final String taskId) { + this.taskId = taskId; + } + + /** + * get the id of the current task. + * + * @return id of the current task + */ + protected final String getTaskId() { + return this.taskId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java new file mode 100644 index 0000000..e322084 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderHDFS.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.data.output; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.reef.annotations.audience.TaskSide; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Implementation of {@link TaskOutputStreamProvider}. + * It provides FileOutputStreams on HDFS. + */ +@TaskSide +public final class TaskOutputStreamProviderHDFS extends TaskOutputStreamProvider { + + /** + * Path of the output directory on HDFS to write outputs. + */ + private final String outputPath; + + /** + * HDFS File system. + */ + private FileSystem fs; + + /** + * Constructor - instantiated via TANG. + * + * @param outputPath path of the output directory on HDFS to write outputs. + */ + @Inject + private TaskOutputStreamProviderHDFS( + @Parameter(TaskOutputService.OutputPath.class) final String outputPath) throws IOException { + this.outputPath = outputPath; + final JobConf jobConf = new JobConf(); + fs = FileSystem.get(jobConf); + } + + /** + * create a file output stream using the given name. + * The path of the file on HDFS is 'outputPath/name/taskId'. + * + * @param name name of the created output stream + * It is used as the name of the directory if the created output stream is a file output stream + * @return OutputStream to a file on HDFS. The path of the file is 'outputPath/name/taskId' + * @throws java.io.IOException + */ + @Override + public DataOutputStream create(final String name) throws IOException { + final String directoryPath = outputPath + Path.SEPARATOR + name; + if (!fs.exists(new Path(directoryPath))) { + fs.mkdirs(new Path(directoryPath)); + } + return fs.create(new Path(directoryPath + Path.SEPARATOR + getTaskId())); + } + + @Override + public void close() throws IOException { + fs.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java new file mode 100644 index 0000000..f3afb89 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/TaskOutputStreamProviderLocal.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.data.output; + +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +/** + * Implementation of {@link TaskOutputStreamProvider}. + * It provides FileOutputStreams on the local file system. + */ +public final class TaskOutputStreamProviderLocal extends TaskOutputStreamProvider { + + /** + * Path of the output directory on the local disk to write outputs. + */ + private final String outputPath; + + /** + * Constructor - instantiated via TANG. + * + * @param outputPath path of the output directory on the local disk to write outputs. + */ + @Inject + private TaskOutputStreamProviderLocal( + @Parameter(TaskOutputService.OutputPath.class) final String outputPath) { + this.outputPath = outputPath; + } + + /** + * create a file output stream using the given name. + * The path of the file on the local file system is 'outputPath/name/taskId'. + * + * @param name name of the created output stream + * It is used as the name of the directory if the created output stream is a file output stream + * @return OutputStream to a file on local file system. The path of the file is 'outputPath/name/taskId' + * @throws java.io.IOException + */ + @Override + public DataOutputStream create(final String name) throws IOException { + final String directoryPath = outputPath + File.separator + name; + final File directory = new File(directoryPath); + + synchronized (TaskOutputStreamProviderLocal.class) { + if (!directory.exists()) { + directory.mkdirs(); + } + } + + final File file = new File(directoryPath + File.separator + getTaskId()); + return new DataOutputStream(new FileOutputStream(file)); + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/15dd7869/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java new file mode 100644 index 0000000..d6711db --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/output/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Data Output Service. + * The output service provides tasks with common interface + * through which tasks can write outputs without + * considering the current runtime and collision with other tasks. + */ +package org.apache.reef.io.data.output;
