[BEAM-1899] Add JStormRunnerRegistrar and empty implementations of PipelineRunner, RunnerResult, PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15ebaf0f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15ebaf0f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15ebaf0f Branch: refs/heads/jstorm-runner Commit: 15ebaf0f77c6194f46666f676644a2ff79fb24a1 Parents: 9d4de1b Author: Pei He <[email protected]> Authored: Thu Apr 6 14:55:25 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Apr 22 15:05:20 2017 +0800 ---------------------------------------------------------------------- .../runners/jstorm/JStormPipelineOptions.java | 26 ++++++++ .../beam/runners/jstorm/JStormRunner.java | 33 ++++++++++ .../runners/jstorm/JStormRunnerRegistrar.java | 55 +++++++++++++++++ .../beam/runners/jstorm/JStormRunnerResult.java | 63 ++++++++++++++++++++ .../beam/runners/jstorm/package-info.java | 22 +++++++ .../jstorm/JStormRunnerRegistrarTest.java | 49 +++++++++++++++ 6 files changed, 248 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/15ebaf0f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java new file mode 100644 index 0000000..cc0aed5 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link PipelineOptions} that configures the JStorm pipeline. + */ +public interface JStormPipelineOptions extends PipelineOptions { +} http://git-wip-us.apache.org/repos/asf/beam/blob/15ebaf0f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java new file mode 100644 index 0000000..78df4ca --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.PipelineRunner; + +/** + * A {@link PipelineRunner} that translates the {@link Pipeline} to a JStorm DAG and executes it + * either locally or on a JStorm cluster. + */ +public class JStormRunner extends PipelineRunner<JStormRunnerResult> { + + @Override + public JStormRunnerResult run(Pipeline pipeline) { + throw new UnsupportedOperationException("This method is not yet implemented."); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/15ebaf0f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java new file mode 100644 index 0000000..b2e00ae --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +/** + * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for the + * {@link JStormRunner}. + */ +public class JStormRunnerRegistrar { + private JStormRunnerRegistrar() {} + + /** + * Register the {@link JStormPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>> of(JStormPipelineOptions.class); + } + } + + /** + * Register the {@link JStormRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { + return ImmutableList.<Class<? extends PipelineRunner<?>>> of(JStormRunner.class); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/15ebaf0f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java new file mode 100644 index 0000000..cbd78f9 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import java.io.IOException; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.transforms.Aggregator; +import org.joda.time.Duration; + +/** + * A {@link PipelineResult} of executing {@link org.apache.beam.sdk.Pipeline Pipelines} using + * {@link JStormRunner}. + */ +public class JStormRunnerResult implements PipelineResult { + @Override + public State getState() { + throw new UnsupportedOperationException("This method is not yet supported."); + } + + @Override + public State cancel() throws IOException { + throw new UnsupportedOperationException("This method is not yet supported."); + } + + @Override + public State waitUntilFinish(Duration duration) { + throw new UnsupportedOperationException("This method is not yet supported."); + } + + @Override + public State waitUntilFinish() { + throw new UnsupportedOperationException("This method is not yet supported."); + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) + throws AggregatorRetrievalException { + throw new UnsupportedOperationException("This method is not yet supported."); + } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("This method is not yet supported."); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/15ebaf0f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/package-info.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/package-info.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/package-info.java new file mode 100644 index 0000000..a4d9428 --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for JStorm. + */ +package org.apache.beam.runners.jstorm; http://git-wip-us.apache.org/repos/asf/beam/blob/15ebaf0f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java new file mode 100644 index 0000000..344d3c7 --- /dev/null +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link JStormRunnerRegistrar}. + */ +@RunWith(JUnit4.class) +public class JStormRunnerRegistrarTest { + + @Test + public void testFullName() { + String[] args = + new String[] {String.format("--runner=%s", JStormRunner.class.getName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), JStormRunner.class); + } + + @Test + public void testClassName() { + String[] args = + new String[] {String.format("--runner=%s", JStormRunner.class.getSimpleName())}; + PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); + assertEquals(opts.getRunner(), JStormRunner.class); + } +}
