http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java new file mode 100644 index 0000000..44e7b11 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.operators; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; +import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; +import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.NullSideInputReader; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Apex operator for Beam {@link DoFn}. + */ +public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager { + private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class); + private boolean traceTuples = true; + + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions pipelineOptions; + @Bind(JavaSerializer.class) + private final OldDoFn<InputT, OutputT> doFn; + @Bind(JavaSerializer.class) + private final TupleTag<OutputT> mainOutputTag; + @Bind(JavaSerializer.class) + private final List<TupleTag<?>> sideOutputTags; + @Bind(JavaSerializer.class) + private final WindowingStrategy<?, ?> windowingStrategy; + @Bind(JavaSerializer.class) + private final List<PCollectionView<?>> sideInputs; + + private final StateInternals<Void> sideInputStateInternals; + private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack; + private LongMin pushedBackWatermark = new LongMin(); + private long currentInputWatermark = Long.MIN_VALUE; + private long currentOutputWatermark = currentInputWatermark; + + private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner; + private transient SideInputHandler sideInputHandler; + private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping = + Maps.newHashMapWithExpectedSize(5); + + public ApexParDoOperator( + ApexPipelineOptions pipelineOptions, + OldDoFn<InputT, OutputT> doFn, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + WindowingStrategy<?, ?> windowingStrategy, + List<PCollectionView<?>> sideInputs, + Coder<WindowedValue<InputT>> inputCoder, + StateInternalsFactory<Void> stateInternalsFactory + ) { + this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); + this.doFn = doFn; + this.mainOutputTag = mainOutputTag; + this.sideOutputTags = sideOutputTags; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null); + + if (sideOutputTags.size() > sideOutputPorts.length) { + String msg = String.format("Too many side outputs (currently only supporting %s).", + sideOutputPorts.length); + throw new UnsupportedOperationException(msg); + } + + Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder); + this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), + coder); + + } + + @SuppressWarnings("unused") // for Kryo + private ApexParDoOperator() { + this.pipelineOptions = null; + this.doFn = null; + this.mainOutputTag = null; + this.sideOutputTags = null; + this.windowingStrategy = null; + this.sideInputs = null; + this.pushedBack = null; + this.sideInputStateInternals = null; + } + + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = + new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { + @Override + public void process(ApexStreamTuple<WindowedValue<InputT>> t) { + if (t instanceof ApexStreamTuple.WatermarkTuple) { + processWatermark((ApexStreamTuple.WatermarkTuple<?>) t); + } else { + if (traceTuples) { + LOG.debug("\ninput {}\n", t.getValue()); + } + Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue()); + for (WindowedValue<InputT> pushedBackValue : justPushedBack) { + pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); + pushedBack.get().add(pushedBackValue); + } + } + } + }; + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = + new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() { + @Override + public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) { + if (t instanceof ApexStreamTuple.WatermarkTuple) { + // ignore side input watermarks + return; + } + + int sideInputIndex = 0; + if (t instanceof ApexStreamTuple.DataTuple) { + sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag(); + } + + if (traceTuples) { + LOG.debug("\nsideInput {} {}\n", sideInputIndex, t.getValue()); + } + + PCollectionView<?> sideInput = sideInputs.get(sideInputIndex); + sideInputHandler.addSideInputValue(sideInput, t.getValue()); + + List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); + for (WindowedValue<InputT> elem : pushedBack.get()) { + Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem); + Iterables.addAll(newPushedBack, justPushedBack); + } + + pushedBack.get().clear(); + pushedBackWatermark.clear(); + for (WindowedValue<InputT> pushedBackValue : newPushedBack) { + pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); + pushedBack.get().add(pushedBackValue); + } + + // potentially emit watermark + processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark)); + } + }; + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 = + new DefaultOutputPort<>(); + + public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2, + sideOutput3, sideOutput4, sideOutput5}; + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) { + DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag); + if (sideOutputPort != null) { + sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple)); + } else { + output.emit(ApexStreamTuple.DataTuple.of(tuple)); + } + if (traceTuples) { + LOG.debug("\nemitting {}\n", tuple); + } + } + + private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { + try { + pushbackDoFnRunner.startBundle(); + Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner + .processElementInReadyWindows(elem); + pushbackDoFnRunner.finishBundle(); + return pushedBack; + } catch (UserCodeException ue) { + if (ue.getCause() instanceof AssertionError) { + ApexRunner.assertionError = (AssertionError) ue.getCause(); + } + throw ue; + } + } + + private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { + this.currentInputWatermark = mark.getTimestamp(); + + if (sideInputs.isEmpty()) { + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", mark); + } + output.emit(mark); + return; + } + + long potentialOutputWatermark = + Math.min(pushedBackWatermark.get(), currentInputWatermark); + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", currentOutputWatermark); + } + output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); + } + } + + @Override + public void setup(OperatorContext context) { + this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); + SideInputReader sideInputReader = NullSideInputReader.of(sideInputs); + if (!sideInputs.isEmpty()) { + sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); + sideInputReader = sideInputHandler; + } + + for (int i = 0; i < sideOutputTags.size(); i++) { + @SuppressWarnings("unchecked") + DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>) + sideOutputPorts[i]; + sideOutputPortMapping.put(sideOutputTags.get(i), port); + } + + DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault( + pipelineOptions.get(), + doFn, + sideInputReader, + this, + mainOutputTag, + sideOutputTags, + new NoOpStepContext(), + new NoOpAggregatorFactory(), + windowingStrategy + ); + + pushbackDoFnRunner = + PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + + try { + doFn.setup(); + } catch (Exception e) { + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + + } + + @Override + public void beginWindow(long windowId) { + } + + @Override + public void endWindow() { + } + + /** + * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode. + * It is called from {@link org.apache.beam.runners.core.SimpleDoFnRunner}. + */ + public static class NoOpAggregatorFactory implements AggregatorFactory { + + private NoOpAggregatorFactory() { + } + + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, ExecutionContext.StepContext step, + String name, CombineFn<InputT, AccumT, OutputT> combine) { + return new NoOpAggregator<>(); + } + + private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, + java.io.Serializable { + private static final long serialVersionUID = 1L; + + @Override + public void addValue(InputT value) { + } + + @Override + public String getName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public CombineFn<InputT, ?, OutputT> getCombineFn() { + // TODO Auto-generated method stub + return null; + } + + }; + } + + private static class LongMin { + long state = Long.MAX_VALUE; + + public void add(long l) { + state = Math.min(state, l); + } + + public long get() { + return state; + } + + public void clear() { + state = Long.MAX_VALUE; + } + + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java new file mode 100644 index 0000000..6fc2f0c --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation.operators; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Throwables; + +import java.io.IOException; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple; +import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; +import org.apache.beam.runners.apex.translation.utils.ValuesSource; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Apex input operator that wraps Beam {@link UnboundedSource}. + */ +public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT + extends UnboundedSource.CheckpointMark> implements InputOperator { + private static final Logger LOG = LoggerFactory.getLogger( + ApexReadUnboundedInputOperator.class); + private boolean traceTuples = false; + private long outputWatermark = 0; + + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions pipelineOptions; + @Bind(JavaSerializer.class) + private final UnboundedSource<OutputT, CheckpointMarkT> source; + private final boolean isBoundedSource; + private transient UnboundedSource.UnboundedReader<OutputT> reader; + private transient boolean available = false; + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = + new DefaultOutputPort<>(); + + public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, + ApexPipelineOptions options) { + this.pipelineOptions = new SerializablePipelineOptions(options); + this.source = source; + this.isBoundedSource = false; + } + + public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, + boolean isBoundedSource, ApexPipelineOptions options) { + this.pipelineOptions = new SerializablePipelineOptions(options); + this.source = source; + this.isBoundedSource = isBoundedSource; + } + + @SuppressWarnings("unused") // for Kryo + private ApexReadUnboundedInputOperator() { + this.pipelineOptions = null; this.source = null; this.isBoundedSource = false; + } + + @Override + public void beginWindow(long windowId) { + if (!available && (isBoundedSource || source instanceof ValuesSource)) { + // if it's a Create and the input was consumed, emit final watermark + emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()); + // terminate the stream (allows tests to finish faster) + BaseOperator.shutdown(); + } else { + emitWatermarkIfNecessary(reader.getWatermark().getMillis()); + } + } + + private void emitWatermarkIfNecessary(long mark) { + if (mark > outputWatermark) { + outputWatermark = mark; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", mark); + } + output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark)); + } + } + + @Override + public void endWindow() { + } + + @Override + public void setup(OperatorContext context) { + this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); + try { + reader = source.createReader(this.pipelineOptions.get(), null); + available = reader.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void teardown() { + try { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void emitTuples() { + try { + if (!available) { + available = reader.advance(); + } + if (available) { + OutputT data = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + available = reader.advance(); + if (traceTuples) { + LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp); + } + output.emit(DataTuple.of(WindowedValue.of( + data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); + } + } catch (Exception e) { + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java new file mode 100644 index 0000000..6bc0194 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.translation.operators; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java new file mode 100644 index 0000000..de954c0 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.translation; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java new file mode 100644 index 0000000..17a4f81 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTag.StateBinder; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Instant; + +/** + * Implementation of {@link StateInternals} that can be serialized and + * checkpointed with the operator. Suitable for small states, in the future this + * should be based on the incremental state saving components in the Apex + * library. + */ +@DefaultSerializer(JavaSerializer.class) +public class ApexStateInternals<K> implements StateInternals<K>, Serializable { + private static final long serialVersionUID = 1L; + public static <K> ApexStateInternals<K> forKey(K key) { + return new ApexStateInternals<>(key); + } + + private final K key; + + protected ApexStateInternals(K key) { + this.key = key; + } + + @Override + public K getKey() { + return key; + } + + /** + * Serializable state for internals (namespace to state tag to coded value). + */ + private final Table<String, String, byte[]> stateTable = HashBasedTable.create(); + + @Override + public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + return state(namespace, address, StateContexts.nullContext()); + } + + @Override + public <T extends State> T state( + StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) { + return address.bind(new ApexStateBinder(key, namespace, address, c)); + } + + /** + * A {@link StateBinder} that returns {@link State} wrappers for serialized state. + */ + private class ApexStateBinder implements StateBinder<K> { + private final K key; + private final StateNamespace namespace; + private final StateContext<?> c; + + private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address, + StateContext<?> c) { + this.key = key; + this.namespace = namespace; + this.c = c; + } + + @Override + public <T> ValueState<T> bindValue( + StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + return new ApexValueState<>(namespace, address, coder); + } + + @Override + public <T> BagState<T> bindBag( + final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + return new ApexBagState<>(namespace, address, elemCoder); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + final CombineFn<InputT, AccumT, OutputT> combineFn) { + return new ApexAccumulatorCombiningState<>( + namespace, + address, + accumCoder, + key, + combineFn.<K>asKeyedFn() + ); + } + + @Override + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindKeyedCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + return new ApexAccumulatorCombiningState<>( + namespace, + address, + accumCoder, + key, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindKeyedCombiningValueWithContext( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { + return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + } + } + + private class AbstractState<T> { + protected final StateNamespace namespace; + protected final StateTag<?, ? extends State> address; + protected final Coder<T> coder; + + private AbstractState( + StateNamespace namespace, + StateTag<?, ? extends State> address, + Coder<T> coder) { + this.namespace = namespace; + this.address = address; + this.coder = coder; + } + + protected T readValue() { + T value = null; + byte[] buf = stateTable.get(namespace.stringKey(), address.getId()); + if (buf != null) { + // TODO: reuse input + Input input = new Input(buf); + try { + return coder.decode(input, Context.OUTER); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return value; + } + + public void writeValue(T input) { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + try { + coder.encode(input, output, Context.OUTER); + stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void clear() { + stateTable.remove(namespace.stringKey(), address.getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + @SuppressWarnings("unchecked") + AbstractState<?> that = (AbstractState<?>) o; + return namespace.equals(that.namespace) && address.equals(that.address); + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + address.hashCode(); + return result; + } + } + + private class ApexValueState<T> extends AbstractState<T> implements ValueState<T> { + + private ApexValueState( + StateNamespace namespace, + StateTag<?, ValueState<T>> address, + Coder<T> coder) { + super(namespace, address, coder); + } + + @Override + public ApexValueState<T> readLater() { + return this; + } + + @Override + public T read() { + return readValue(); + } + + @Override + public void write(T input) { + writeValue(input); + } + } + + private final class ApexWatermarkHoldState<W extends BoundedWindow> + extends AbstractState<Instant> implements WatermarkHoldState<W> { + + private final OutputTimeFn<? super W> outputTimeFn; + + public ApexWatermarkHoldState( + StateNamespace namespace, + StateTag<?, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + super(namespace, address, InstantCoder.of()); + this.outputTimeFn = outputTimeFn; + } + + @Override + public ApexWatermarkHoldState<W> readLater() { + return this; + } + + @Override + public Instant read() { + return readValue(); + } + + @Override + public void add(Instant outputTime) { + Instant combined = read(); + combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime); + writeValue(combined); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + @Override + public Boolean read() { + return stateTable.get(namespace.stringKey(), address.getId()) == null; + } + }; + } + + @Override + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; + } + + } + + private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> + extends AbstractState<AccumT> + implements AccumulatorCombiningState<InputT, AccumT, OutputT> { + private final K key; + private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; + + private ApexAccumulatorCombiningState(StateNamespace namespace, + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> coder, + K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + super(namespace, address, coder); + this.key = key; + this.combineFn = combineFn; + } + + @Override + public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() { + return this; + } + + @Override + public OutputT read() { + return combineFn.extractOutput(key, getAccum()); + } + + @Override + public void add(InputT input) { + AccumT accum = getAccum(); + combineFn.addInput(key, accum, input); + writeValue(accum); + } + + @Override + public AccumT getAccum() { + AccumT accum = readValue(); + if (accum == null) { + accum = combineFn.createAccumulator(key); + } + return accum; + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + @Override + public Boolean read() { + return stateTable.get(namespace.stringKey(), address.getId()) == null; + } + }; + } + + @Override + public void addAccum(AccumT accum) { + accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum)); + writeValue(accum); + } + + @Override + public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { + return combineFn.mergeAccumulators(key, accumulators); + } + + } + + private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> { + private ApexBagState( + StateNamespace namespace, + StateTag<?, BagState<T>> address, + Coder<T> coder) { + super(namespace, address, ListCoder.of(coder)); + } + + @Override + public ApexBagState<T> readLater() { + return this; + } + + @Override + public List<T> read() { + List<T> value = super.readValue(); + if (value == null) { + value = new ArrayList<>(); + } + return value; + } + + @Override + public void add(T input) { + List<T> value = read(); + value.add(input); + writeValue(value); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + + @Override + public Boolean read() { + return stateTable.get(namespace.stringKey(), address.getId()) == null; + } + }; + } + } + + /** + * Factory for {@link ApexStateInternals}. + * + * @param <K> key type + */ + public static class ApexStateInternalsFactory<K> + implements StateInternalsFactory<K>, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public StateInternals<K> stateInternalsForKey(K key) { + return ApexStateInternals.forKey(key); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java new file mode 100644 index 0000000..79a4f1b --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.datatorrent.api.Operator; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; + +/** + * The common interface for all objects transmitted through streams. + * + * @param <T> The actual payload type. + */ +public interface ApexStreamTuple<T> { + /** + * Gets the value of the tuple. + * + * @return tuple + */ + T getValue(); + + /** + * Data tuple class. + * + * @param <T> tuple type + */ + class DataTuple<T> implements ApexStreamTuple<T> { + private int unionTag; + private T value; + + public static <T> DataTuple<T> of(T value) { + return new DataTuple<>(value, 0); + } + + private DataTuple(T value, int unionTag) { + this.value = value; + this.unionTag = unionTag; + } + + @Override + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + public int getUnionTag() { + return unionTag; + } + + public void setUnionTag(int unionTag) { + this.unionTag = unionTag; + } + + @Override + public String toString() { + return value.toString(); + } + + } + + /** + * Tuple that includes a timestamp. + * + * @param <T> tuple type + */ + class TimestampedTuple<T> extends DataTuple<T> { + private long timestamp; + + public TimestampedTuple(long timestamp, T value) { + super(value, 0); + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public int hashCode() { + return Objects.hash(timestamp); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TimestampedTuple)) { + return false; + } else { + TimestampedTuple<?> other = (TimestampedTuple<?>) obj; + return (timestamp == other.timestamp) && Objects.equals(this.getValue(), other.getValue()); + } + } + + } + + /** + * Tuple that represents a watermark. + * + * @param <T> tuple type + */ + class WatermarkTuple<T> extends TimestampedTuple<T> { + public static <T> WatermarkTuple<T> of(long timestamp) { + return new WatermarkTuple<>(timestamp); + } + + protected WatermarkTuple(long timestamp) { + super(timestamp, null); + } + + @Override + public String toString() { + return "[Watermark " + getTimestamp() + "]"; + } + } + + /** + * Coder for {@link ApexStreamTuple}. + */ + class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> { + private static final long serialVersionUID = 1L; + final Coder<T> valueCoder; + + public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) { + return new ApexStreamTupleCoder<>(valueCoder); + } + + protected ApexStreamTupleCoder(Coder<T> valueCoder) { + this.valueCoder = checkNotNull(valueCoder); + } + + @Override + public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context) + throws CoderException, IOException { + if (value instanceof WatermarkTuple) { + outStream.write(1); + new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp()); + } else { + outStream.write(0); + outStream.write(((DataTuple<?>) value).unionTag); + valueCoder.encode(value.getValue(), outStream, context); + } + } + + @Override + public ApexStreamTuple<T> decode(InputStream inStream, Context context) + throws CoderException, IOException { + int b = inStream.read(); + if (b == 1) { + return new WatermarkTuple<>(new DataInputStream(inStream).readLong()); + } else { + int unionTag = inStream.read(); + return new DataTuple<>(valueCoder.decode(inStream, context), unionTag); + } + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.<Coder<?>>asList(valueCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic( + this.getClass().getSimpleName() + " requires a deterministic valueCoder", + valueCoder); + } + + /** + * Returns the value coder. + */ + public Coder<T> getValueCoder() { + return valueCoder; + } + + } + + /** + * Central if data tuples received on and emitted from ports should be logged. + * Should be called in setup and value cached in operator. + */ + final class Logging { + public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) { + return options.isTupleTracingEnabled(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java new file mode 100644 index 0000000..d08e76f --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.netlet.util.Slice; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; + +/** + * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}. + */ +public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable { + private static final long serialVersionUID = 1L; + private final Coder<? super Object> coder; + + public CoderAdapterStreamCodec(Coder<? super Object> coder) { + this.coder = coder; + } + + @Override + public Object fromByteArray(Slice fragment) { + ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, + fragment.length); + try { + return coder.decode(bis, Context.OUTER); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Slice toByteArray(Object wv) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + coder.encode(wv, bos, Context.OUTER); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new Slice(bos.toByteArray()); + } + + @Override + public int getPartition(Object o) { + return o.hashCode(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java new file mode 100644 index 0000000..078f95f --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Serializable {@link ExecutionContext.StepContext} that does nothing. + */ +public class NoOpStepContext implements ExecutionContext.StepContext, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue<?> output) { + } + + @Override + public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, + Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws + IOException { + + } + + @Override + public StateInternals<?> stateInternals() { + return null; + } + + @Override + public TimerInternals timerInternals() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java new file mode 100644 index 0000000..d0dce2b --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * A wrapper to enable serialization of {@link PipelineOptions}. + */ +public class SerializablePipelineOptions implements Externalizable { + + private transient ApexPipelineOptions pipelineOptions; + + public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + public SerializablePipelineOptions() { + } + + public ApexPipelineOptions get() { + return this.pipelineOptions; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions)); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + String s = in.readUTF(); + this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class) + .as(ApexPipelineOptions.class); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java new file mode 100644 index 0000000..395ad1f --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translation.utils; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +import java.io.IOException; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; + + +/** + * A {@link KryoSerializable} holder that uses the specified {@link Coder}. + * @param <T> element type + */ +public class ValueAndCoderKryoSerializable<T> implements KryoSerializable { + private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer(); + private T value; + private Coder<T> coder; + + public ValueAndCoderKryoSerializable(T value, Coder<T> coder) { + this.value = value; + this.coder = coder; + } + + @SuppressWarnings("unused") // for Kryo + private ValueAndCoderKryoSerializable() { + } + + public T get() { + return value; + } + + @Override + public void write(Kryo kryo, Output output) { + try { + kryo.writeClass(output, coder.getClass()); + kryo.writeObject(output, coder, JAVA_SERIALIZER); + coder.encode(value, output, Context.OUTER); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void read(Kryo kryo, Input input) { + try { + @SuppressWarnings("unchecked") + Class<Coder<T>> type = kryo.readClass(input).getType(); + coder = kryo.readObject(input, type, JAVA_SERIALIZER); + value = coder.decode(input, Context.OUTER); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java new file mode 100644 index 0000000..8526618 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translation.utils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; + +/** + * Unbounded source that reads from a Java {@link Iterable}. + */ +public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { + private static final long serialVersionUID = 1L; + + private final byte[] codedValues; + private final IterableCoder<T> iterableCoder; + + public ValuesSource(Iterable<T> values, Coder<T> coder) { + this.iterableCoder = IterableCoder.of(coder); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + iterableCoder.encode(values, bos, Context.OUTER); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + this.codedValues = bos.toByteArray(); + } + + @Override + public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.singletonList(this); + } + + @Override + public UnboundedReader<T> createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + ByteArrayInputStream bis = new ByteArrayInputStream(codedValues); + try { + Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER); + return new ValuesReader<>(values, this); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Nullable + @Override + public Coder<CheckpointMark> getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return iterableCoder.getElemCoder(); + } + + private static class ValuesReader<T> extends UnboundedReader<T> { + + private final Iterable<T> values; + private final UnboundedSource<T, CheckpointMark> source; + private transient Iterator<T> iterator; + private T current; + + public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) { + this.values = values; + this.source = source; + } + + @Override + public boolean start() throws IOException { + if (null == iterator) { + iterator = values.iterator(); + } + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + current = iterator.next(); + return true; + } else { + return false; + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource<T, ?> getCurrentSource() { + return source; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java new file mode 100644 index 0000000..534b645 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex.translation.utils; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java deleted file mode 100644 index 539f311..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; -import org.apache.beam.runners.apex.translators.io.ValuesSource; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PBegin; - - -/** - * Wraps elements from Create.Values into an {@link UnboundedSource}. - * mainly used for testing - */ -public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> { - private static final long serialVersionUID = 1451000241832745629L; - - @Override - public void translate(Create.Values<T> transform, TranslationContext context) { - try { - UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(), - transform.getDefaultOutputCoder((PBegin) context.getInput())); - ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( - unboundedSource, context.getPipelineOptions()); - context.addOperator(operator, operator.output); - } catch (CannotProvideCoderException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java deleted file mode 100644 index a39aacb..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import com.google.common.collect.Lists; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.beam.runners.apex.translators.functions.ApexFlattenOperator; -import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; -import org.apache.beam.runners.apex.translators.io.ValuesSource; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; - -/** - * {@link Flatten.FlattenPCollectionList} translation to Apex operator. - */ -public class FlattenPCollectionTranslator<T> implements - TransformTranslator<Flatten.FlattenPCollectionList<T>> { - private static final long serialVersionUID = 1L; - - @Override - public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { - PCollectionList<T> input = context.getInput(); - List<PCollection<T>> collections = input.getAll(); - - if (collections.isEmpty()) { - // create a dummy source that never emits anything - @SuppressWarnings("unchecked") - UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST, - (Coder<T>) VoidCoder.of()); - ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( - unboundedSource, context.getPipelineOptions()); - context.addOperator(operator, operator.output); - } else { - PCollection<T> output = context.getOutput(); - Map<PCollection<?>, Integer> unionTags = Collections.emptyMap(); - flattenCollections(collections, unionTags, output, context); - } - } - - /** - * Flatten the given collections into the given result collection. Translates - * into a cascading merge with 2 input ports per operator. The optional union - * tags can be used to identify the source in the result stream, used to - * channel multiple side inputs to a single Apex operator port. - * - * @param collections - * @param unionTags - * @param finalCollection - * @param context - */ - static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>, - Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) { - List<PCollection<T>> remainingCollections = Lists.newArrayList(); - PCollection<T> firstCollection = null; - while (!collections.isEmpty()) { - for (PCollection<T> collection : collections) { - if (null == firstCollection) { - firstCollection = collection; - } else { - ApexFlattenOperator<T> operator = new ApexFlattenOperator<>(); - context.addStream(firstCollection, operator.data1); - Integer unionTag = unionTags.get(firstCollection); - operator.data1Tag = (unionTag != null) ? unionTag : 0; - context.addStream(collection, operator.data2); - unionTag = unionTags.get(collection); - operator.data2Tag = (unionTag != null) ? unionTag : 0; - - if (!collection.getCoder().equals(firstCollection.getCoder())) { - throw new UnsupportedOperationException("coders don't match"); - } - - if (collections.size() > 2) { - PCollection<T> intermediateCollection = intermediateCollection(collection, - collection.getCoder()); - context.addOperator(operator, operator.out, intermediateCollection); - remainingCollections.add(intermediateCollection); - } else { - // final stream merge - context.addOperator(operator, operator.out, finalCollection); - } - firstCollection = null; - } - } - if (firstCollection != null) { - // push to next merge level - remainingCollections.add(firstCollection); - firstCollection = null; - } - if (remainingCollections.size() > 1) { - collections = remainingCollections; - remainingCollections = Lists.newArrayList(); - } else { - collections = Lists.newArrayList(); - } - } - } - - static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) { - PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), - input.getWindowingStrategy(), input.isBounded()); - output.setCoder(outputCoder); - return output; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java deleted file mode 100644 index cb78579..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * {@link GroupByKey} translation to Apex operator. - */ -public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> { - private static final long serialVersionUID = 1L; - - @Override - public void translate(GroupByKey<K, V> transform, TranslationContext context) { - PCollection<KV<K, V>> input = context.getInput(); - ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), - input, context.<K>stateInternalsFactory() - ); - context.addOperator(group, group.output); - context.addStream(input, group.input); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java deleted file mode 100644 index 987b729..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.OutputPort; -import com.google.common.collect.Maps; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}. - */ -public class ParDoBoundMultiTranslator<InputT, OutputT> - implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class); - - @Override - public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { - OldDoFn<InputT, OutputT> doFn = transform.getFn(); - PCollectionTuple output = context.getOutput(); - PCollection<InputT> input = context.getInput(); - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - Coder<InputT> inputCoder = input.getCoder(); - WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( - context.getPipelineOptions(), - doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), - context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder, - context.<Void>stateInternalsFactory() - ); - - Map<TupleTag<?>, PCollection<?>> outputs = output.getAll(); - Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size()); - for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { - if (outputEntry.getKey() == transform.getMainOutputTag()) { - ports.put(outputEntry.getValue(), operator.output); - } else { - int portIndex = 0; - for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) { - if (tag == outputEntry.getKey()) { - ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]); - break; - } - portIndex++; - } - } - } - context.addOperator(operator, ports); - context.addStream(context.getInput(), operator.input); - if (!sideInputs.isEmpty()) { - addSideInputs(operator, sideInputs, context); - } - } - - static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, - TranslationContext context) { - Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1}; - if (sideInputs.size() > sideInputPorts.length) { - PCollection<?> unionCollection = unionSideInputs(sideInputs, context); - context.addStream(unionCollection, sideInputPorts[0]); - } else { - // the number of ports for side inputs is fixed and each port can only take one input. - for (int i = 0; i < sideInputs.size(); i++) { - context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]); - } - } - } - - private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, - TranslationContext context) { - checkArgument(sideInputs.size() > 1, "requires multiple side inputs"); - // flatten and assign union tag - List<PCollection<Object>> sourceCollections = new ArrayList<>(); - Map<PCollection<?>, Integer> unionTags = new HashMap<>(); - PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0)); - for (int i = 0; i < sideInputs.size(); i++) { - PCollectionView<?> sideInput = sideInputs.get(i); - PCollection<?> sideInputCollection = context.getViewInput(sideInput); - if (!sideInputCollection.getWindowingStrategy().equals( - firstSideInput.getWindowingStrategy())) { - // TODO: check how to handle this in stream codec - //String msg = "Multiple side inputs with different window strategies."; - //throw new UnsupportedOperationException(msg); - LOG.warn("Side inputs union with different windowing strategies {} {}", - firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy()); - } - if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) { - String msg = "Multiple side inputs with different coders."; - throw new UnsupportedOperationException(msg); - } - sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput)); - unionTags.put(sideInputCollection, i); - } - - PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection( - firstSideInput, firstSideInput.getCoder()); - FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, - context); - return resultCollection; - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java deleted file mode 100644 index 92567a6..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import java.util.List; - -import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; - -/** - * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. - */ -public class ParDoBoundTranslator<InputT, OutputT> implements - TransformTranslator<ParDo.Bound<InputT, OutputT>> { - private static final long serialVersionUID = 1L; - - @Override - public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { - OldDoFn<InputT, OutputT> doFn = transform.getFn(); - PCollection<OutputT> output = context.getOutput(); - PCollection<InputT> input = context.getInput(); - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - Coder<InputT> inputCoder = input.getCoder(); - WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, - input.getWindowingStrategy().getWindowFn().windowCoder()); - - ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( - context.getPipelineOptions(), - doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/, - output.getWindowingStrategy(), sideInputs, wvInputCoder, - context.<Void>stateInternalsFactory() - ); - context.addOperator(operator, operator.output); - context.addStream(context.getInput(), operator.input); - if (!sideInputs.isEmpty()) { - ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java deleted file mode 100644 index 3097276..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - -import com.datatorrent.api.InputOperator; - -import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; - -/** - * {@link Read.Unbounded} is translated to Apex {@link InputOperator} - * that wraps {@link UnboundedSource}. - */ -public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> { - private static final long serialVersionUID = 1L; - - @Override - public void translate(Read.Unbounded<T> transform, TranslationContext context) { - UnboundedSource<T, ?> unboundedSource = transform.getSource(); - ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( - unboundedSource, context.getPipelineOptions()); - context.addOperator(operator, operator.output); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java deleted file mode 100644 index dfd2045..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators; - - -import java.io.Serializable; - -import org.apache.beam.sdk.transforms.PTransform; - -/** - * Translates {@link PTransform} to Apex functions. - */ -public interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable { - void translate(T transform, TranslationContext context); -}