http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java deleted file mode 100644 index 07c6494..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.Operator.OutputPort; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.translators.utils.ApexStateInternals; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; -import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.TransformTreeNode; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.util.state.StateInternalsFactory; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; - -/** - * Maintains context data for {@link TransformTranslator}s. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class TranslationContext { - - private final ApexPipelineOptions pipelineOptions; - private AppliedPTransform<?, ?, ?> currentTransform; - private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>(); - private final Map<String, Operator> operators = new HashMap<>(); - private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>(); - - public void addView(PCollectionView<?> view) { - this.viewInputs.put(view, this.getInput()); - } - - public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) { - PInput input = this.viewInputs.get(view); - checkArgument(input != null, "unknown view " + view.getName()); - return (InputT) input; - } - - public TranslationContext(ApexPipelineOptions pipelineOptions) { - this.pipelineOptions = pipelineOptions; - } - - public void setCurrentTransform(TransformTreeNode treeNode) { - this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), - treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); - } - - public ApexPipelineOptions getPipelineOptions() { - return pipelineOptions; - } - - public <InputT extends PInput> InputT getInput() { - return (InputT) getCurrentTransform().getInput(); - } - - public <OutputT extends POutput> OutputT getOutput() { - return (OutputT) getCurrentTransform().getOutput(); - } - - private AppliedPTransform<?, ?, ?> getCurrentTransform() { - checkArgument(currentTransform != null, "current transform not set"); - return currentTransform; - } - - public void addOperator(Operator operator, OutputPort port) { - addOperator(operator, port, this.<PCollection<?>>getOutput()); - } - - /** - * Register operator and output ports for the given collections. - * @param operator - * @param ports - */ - public void addOperator(Operator operator, Map<PCollection<?>, OutputPort<?>> ports) { - boolean first = true; - for (Map.Entry<PCollection<?>, OutputPort<?>> portEntry : ports.entrySet()) { - if (first) { - addOperator(operator, portEntry.getValue(), portEntry.getKey()); - first = false; - } else { - this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(), - new ArrayList<>())); - } - } - } - - /** - * Add the operator with its output port for the given result {link PCollection}. - * @param operator - * @param port - * @param output - */ - public void addOperator(Operator operator, OutputPort port, PCollection output) { - // Apex DAG requires a unique operator name - // use the transform's name and make it unique - String name = getCurrentTransform().getFullName(); - for (int i = 1; this.operators.containsKey(name); i++) { - name = getCurrentTransform().getFullName() + i; - } - this.operators.put(name, operator); - this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>())); - } - - public void addStream(PInput input, InputPort inputPort) { - Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input); - checkArgument(stream != null, "no upstream operator defined for %s", input); - stream.getRight().add(inputPort); - } - - public void populateDAG(DAG dag) { - for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) { - dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue()); - } - int streamIndex = 0; - for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this. - streams.entrySet()) { - List<InputPort<?>> sinksList = streamEntry.getValue().getRight(); - InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]); - if (sinks.length > 0) { - dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks); - for (InputPort port : sinks) { - PCollection pc = streamEntry.getKey(); - Coder coder = pc.getCoder(); - if (pc.getWindowingStrategy() != null) { - coder = FullWindowedValueCoder.of(pc.getCoder(), - pc.getWindowingStrategy().getWindowFn().windowCoder() - ); - } - Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder); - CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder); - dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec); - } - } - } - } - - /** - * Return the {@link StateInternalsFactory} for the pipeline translation. - * @return - */ - public <K> StateInternalsFactory<K> stateInternalsFactory() { - return new ApexStateInternals.ApexStateInternalsFactory(); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java deleted file mode 100644 index 703b1f4..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.functions; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; - -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple; -import org.apache.beam.sdk.util.WindowedValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Apex operator for Beam {@link Flatten.FlattenPCollectionList}. - */ -public class ApexFlattenOperator<InputT> extends BaseOperator { - - private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class); - private boolean traceTuples = false; - - private long inputWM1; - private long inputWM2; - private long outputWM; - - public int data1Tag; - public int data2Tag; - - /** - * Data input port 1. - */ - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = - new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { - /** - * Emits to port "out" - */ - @Override - public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) { - if (tuple instanceof WatermarkTuple) { - WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple; - if (wmTuple.getTimestamp() > inputWM1) { - inputWM1 = wmTuple.getTimestamp(); - if (inputWM1 <= inputWM2) { - // move output watermark and emit it - outputWM = inputWM1; - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", outputWM); - } - out.emit(tuple); - } - } - return; - } - if (traceTuples) { - LOG.debug("\nemitting {}\n", tuple); - } - - if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) { - ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag); - } - out.emit(tuple); - } - }; - - /** - * Data input port 2. - */ - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = - new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { - /** - * Emits to port "out" - */ - @Override - public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) { - if (tuple instanceof WatermarkTuple) { - WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple; - if (wmTuple.getTimestamp() > inputWM2) { - inputWM2 = wmTuple.getTimestamp(); - if (inputWM2 <= inputWM1) { - // move output watermark and emit it - outputWM = inputWM2; - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", outputWM); - } - out.emit(tuple); - } - } - return; - } - if (traceTuples) { - LOG.debug("\nemitting {}\n", tuple); - } - - if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) { - ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag); - } - out.emit(tuple); - } - }; - - /** - * Output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = - new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>(); - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java deleted file mode 100644 index 4c28c85..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.functions; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Throwables; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; -import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; -import org.apache.beam.runners.core.SystemReduceFn; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Apex operator for Beam {@link GroupByKey}. - * This operator expects the input stream already partitioned by K, - * which is determined by the {@link StreamCodec} on the input port. - * - * @param <K> - * @param <V> - */ -public class ApexGroupByKeyOperator<K, V> implements Operator { - private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class); - private boolean traceTuples = true; - - @Bind(JavaSerializer.class) - private WindowingStrategy<V, BoundedWindow> windowingStrategy; - @Bind(JavaSerializer.class) - private Coder<K> keyCoder; - @Bind(JavaSerializer.class) - private Coder<V> valueCoder; - - @Bind(JavaSerializer.class) - private final SerializablePipelineOptions serializedOptions; - @Bind(JavaSerializer.class) - private final StateInternalsFactory<K> stateInternalsFactory; - private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>(); - private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); - - private transient ProcessContext context; - private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn; - private transient ApexTimerInternals timerInternals = new ApexTimerInternals(); - private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = - new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() { - @Override - public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) { - try { - if (t instanceof ApexStreamTuple.WatermarkTuple) { - ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t; - processWatermark(mark); - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", mark.getTimestamp()); - } - output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of( - mark.getTimestamp())); - return; - } - if (traceTuples) { - LOG.debug("\ninput {}\n", t.getValue()); - } - processElement(t.getValue()); - } catch (Exception e) { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } - } - }; - - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> - output = new DefaultOutputPort<>(); - - @SuppressWarnings("unchecked") - public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input, - StateInternalsFactory<K> stateInternalsFactory) { - checkNotNull(pipelineOptions); - this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); - this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy(); - this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder(); - this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder(); - this.stateInternalsFactory = stateInternalsFactory; - } - - @SuppressWarnings("unused") // for Kryo - private ApexGroupByKeyOperator() { - this.serializedOptions = null; - this.stateInternalsFactory = null; - } - - @Override - public void beginWindow(long l) { - } - - @Override - public void endWindow() { - } - - @Override - public void setup(OperatorContext context) { - this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this); - StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory(); - this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, - stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder)); - this.context = new ProcessContext(fn, this.timerInternals); - } - - @Override - public void teardown() { - } - - /** - * Returns the list of timers that are ready to fire. These are the timers - * that are registered to be triggered at a time before the current watermark. - * We keep these timers in a Set, so that they are deduplicated, as the same - * timer can be registered multiple times. - */ - private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess( - long currentWatermark) { - - // we keep the timers to return in a different list and launch them later - // because we cannot prevent a trigger from registering another trigger, - // which would lead to concurrent modification exception. - Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create(); - - Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = - activeTimers.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); - - Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); - while (timerIt.hasNext()) { - TimerInternals.TimerData timerData = timerIt.next(); - if (timerData.getTimestamp().isBefore(currentWatermark)) { - toFire.put(keyWithTimers.getKey(), timerData); - timerIt.remove(); - } - } - - if (keyWithTimers.getValue().isEmpty()) { - it.remove(); - } - } - return toFire; - } - - private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception { - final KV<K, V> kv = windowedValue.getValue(); - final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPane()); - - KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem( - kv.getKey(), - Collections.singletonList(updatedWindowedValue)); - - context.setElement(kwi, getStateInternalsForKey(kwi.key())); - fn.processElement(context); - } - - private StateInternals<K> getStateInternalsForKey(K key) { - final ByteBuffer keyBytes; - try { - keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); - } catch (CoderException e) { - throw new RuntimeException(e); - } - StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes); - if (stateInternals == null) { - stateInternals = stateInternalsFactory.stateInternalsForKey(key); - perKeyStateInternals.put(keyBytes, stateInternals); - } - return stateInternals; - } - - private void registerActiveTimer(K key, TimerInternals.TimerData timer) { - final ByteBuffer keyBytes; - try { - keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); - } catch (CoderException e) { - throw new RuntimeException(e); - } - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); - if (timersForKey == null) { - timersForKey = new HashSet<>(); - } - timersForKey.add(timer); - activeTimers.put(keyBytes, timersForKey); - } - - private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { - final ByteBuffer keyBytes; - try { - keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); - } catch (CoderException e) { - throw new RuntimeException(e); - } - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); - if (timersForKey != null) { - timersForKey.remove(timer); - if (timersForKey.isEmpty()) { - activeTimers.remove(keyBytes); - } else { - activeTimers.put(keyBytes, timersForKey); - } - } - } - - private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception { - this.inputWatermark = new Instant(mark.getTimestamp()); - Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess( - mark.getTimestamp()); - if (!timers.isEmpty()) { - for (ByteBuffer keyBytes : timers.keySet()) { - K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); - KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes)); - context.setElement(kwi, getStateInternalsForKey(kwi.key())); - fn.processElement(context); - } - } - } - - private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, - KeyedWorkItem<K, V>>.ProcessContext { - - private final ApexTimerInternals timerInternals; - private StateInternals<K> stateInternals; - private KeyedWorkItem<K, V> element; - - public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function, - ApexTimerInternals timerInternals) { - function.super(); - this.timerInternals = checkNotNull(timerInternals); - } - - public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) { - this.element = element; - this.stateInternals = stateForKey; - } - - @Override - public KeyedWorkItem<K, V> element() { - return this.element; - } - - @Override - public Instant timestamp() { - throw new UnsupportedOperationException( - "timestamp() is not available when processing KeyedWorkItems."); - } - - @Override - public PipelineOptions getPipelineOptions() { - return serializedOptions.get(); - } - - @Override - public void output(KV<K, Iterable<V>> output) { - throw new UnsupportedOperationException( - "output() is not available when processing KeyedWorkItems."); - } - - @Override - public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) { - throw new UnsupportedOperationException( - "outputWithTimestamp() is not available when processing KeyedWorkItems."); - } - - @Override - public PaneInfo pane() { - throw new UnsupportedOperationException( - "pane() is not available when processing KeyedWorkItems."); - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException( - "window() is not available when processing KeyedWorkItems."); - } - - @Override - public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() { - return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() { - - @Override - public StateInternals<K> stateInternals() { - return stateInternals; - } - - @Override - public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { - if (traceTuples) { - LOG.debug("\nemitting {} timestamp {}\n", output, timestamp); - } - ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of( - WindowedValue.of(output, timestamp, windows, pane))); - } - - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException("windows() is not available in Streaming mode."); - } - - @Override - public PaneInfo pane() { - throw new UnsupportedOperationException("pane() is not available in Streaming mode."); - } - - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, - Coder<T> elemCoder) throws IOException { - throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() is not available in Streaming mode."); - } - }; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - throw new RuntimeException("sideInput() is not supported in Streaming mode."); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - throw new RuntimeException("sideOutput() is not available when grouping by window."); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( - String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - throw new UnsupportedOperationException(); - } - } - - /** - * An implementation of Beam's {@link TimerInternals}. - * - */ - public class ApexTimerInternals implements TimerInternals { - - @Override - public void setTimer(TimerData timerKey) { - registerActiveTimer(context.element().key(), timerKey); - } - - @Override - public void deleteTimer(TimerData timerKey) { - unregisterActiveTimer(context.element().key(), timerKey); - } - - @Override - public Instant currentProcessingTime() { - return Instant.now(); - } - - @Override - public Instant currentSynchronizedProcessingTime() { - // TODO Auto-generated method stub - return null; - } - - @Override - public Instant currentInputWatermarkTime() { - return inputWatermark; - } - - @Override - public Instant currentOutputWatermarkTime() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void setTimer(StateNamespace namespace, String timerId, Instant target, - TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting timer by ID not yet supported."); - } - - @Override - public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); - } - - } - - private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable { - private static final long serialVersionUID = 1L; - - @Override - public StateInternals<K> stateInternalsForKey(K key) { - return getStateInternalsForKey(key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java deleted file mode 100644 index 43384d6..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java +++ /dev/null @@ -1,375 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.functions; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunner; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; -import org.apache.beam.runners.apex.translators.utils.NoOpStepContext; -import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; -import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; -import org.apache.beam.runners.core.SideInputHandler; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.ExecutionContext; -import org.apache.beam.sdk.util.NullSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Apex operator for Beam {@link DoFn}. - */ -public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager { - private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class); - private boolean traceTuples = true; - - @Bind(JavaSerializer.class) - private final SerializablePipelineOptions pipelineOptions; - @Bind(JavaSerializer.class) - private final OldDoFn<InputT, OutputT> doFn; - @Bind(JavaSerializer.class) - private final TupleTag<OutputT> mainOutputTag; - @Bind(JavaSerializer.class) - private final List<TupleTag<?>> sideOutputTags; - @Bind(JavaSerializer.class) - private final WindowingStrategy<?, ?> windowingStrategy; - @Bind(JavaSerializer.class) - private final List<PCollectionView<?>> sideInputs; - - private final StateInternals<Void> sideInputStateInternals; - private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack; - private LongMin pushedBackWatermark = new LongMin(); - private long currentInputWatermark = Long.MIN_VALUE; - private long currentOutputWatermark = currentInputWatermark; - - private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner; - private transient SideInputHandler sideInputHandler; - private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping = - Maps.newHashMapWithExpectedSize(5); - - public ApexParDoOperator( - ApexPipelineOptions pipelineOptions, - OldDoFn<InputT, OutputT> doFn, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - WindowingStrategy<?, ?> windowingStrategy, - List<PCollectionView<?>> sideInputs, - Coder<WindowedValue<InputT>> inputCoder, - StateInternalsFactory<Void> stateInternalsFactory - ) { - this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); - this.doFn = doFn; - this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null); - - if (sideOutputTags.size() > sideOutputPorts.length) { - String msg = String.format("Too many side outputs (currently only supporting %s).", - sideOutputPorts.length); - throw new UnsupportedOperationException(msg); - } - - Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder); - this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), - coder); - - } - - @SuppressWarnings("unused") // for Kryo - private ApexParDoOperator() { - this.pipelineOptions = null; - this.doFn = null; - this.mainOutputTag = null; - this.sideOutputTags = null; - this.windowingStrategy = null; - this.sideInputs = null; - this.pushedBack = null; - this.sideInputStateInternals = null; - } - - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = - new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { - @Override - public void process(ApexStreamTuple<WindowedValue<InputT>> t) { - if (t instanceof ApexStreamTuple.WatermarkTuple) { - processWatermark((ApexStreamTuple.WatermarkTuple<?>) t); - } else { - if (traceTuples) { - LOG.debug("\ninput {}\n", t.getValue()); - } - Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue()); - for (WindowedValue<InputT> pushedBackValue : justPushedBack) { - pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); - pushedBack.get().add(pushedBackValue); - } - } - } - }; - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = - new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() { - @Override - public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) { - if (t instanceof ApexStreamTuple.WatermarkTuple) { - // ignore side input watermarks - return; - } - - int sideInputIndex = 0; - if (t instanceof ApexStreamTuple.DataTuple) { - sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag(); - } - - if (traceTuples) { - LOG.debug("\nsideInput {} {}\n", sideInputIndex, t.getValue()); - } - - PCollectionView<?> sideInput = sideInputs.get(sideInputIndex); - sideInputHandler.addSideInputValue(sideInput, t.getValue()); - - List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); - for (WindowedValue<InputT> elem : pushedBack.get()) { - Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem); - Iterables.addAll(newPushedBack, justPushedBack); - } - - pushedBack.get().clear(); - pushedBackWatermark.clear(); - for (WindowedValue<InputT> pushedBackValue : newPushedBack) { - pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); - pushedBack.get().add(pushedBackValue); - } - - // potentially emit watermark - processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark)); - } - }; - - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>(); - - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 = - new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 = - new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 = - new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 = - new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 = - new DefaultOutputPort<>(); - - public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2, - sideOutput3, sideOutput4, sideOutput5}; - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) { - DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag); - if (sideOutputPort != null) { - sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple)); - } else { - output.emit(ApexStreamTuple.DataTuple.of(tuple)); - } - if (traceTuples) { - LOG.debug("\nemitting {}\n", tuple); - } - } - - private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { - try { - pushbackDoFnRunner.startBundle(); - Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner - .processElementInReadyWindows(elem); - pushbackDoFnRunner.finishBundle(); - return pushedBack; - } catch (UserCodeException ue) { - if (ue.getCause() instanceof AssertionError) { - ApexRunner.assertionError = (AssertionError) ue.getCause(); - } - throw ue; - } - } - - private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { - this.currentInputWatermark = mark.getTimestamp(); - - if (sideInputs.isEmpty()) { - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", mark); - } - output.emit(mark); - return; - } - - long potentialOutputWatermark = - Math.min(pushedBackWatermark.get(), currentInputWatermark); - if (potentialOutputWatermark > currentOutputWatermark) { - currentOutputWatermark = potentialOutputWatermark; - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", currentOutputWatermark); - } - output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); - } - } - - @Override - public void setup(OperatorContext context) { - this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); - SideInputReader sideInputReader = NullSideInputReader.of(sideInputs); - if (!sideInputs.isEmpty()) { - sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); - sideInputReader = sideInputHandler; - } - - for (int i = 0; i < sideOutputTags.size(); i++) { - @SuppressWarnings("unchecked") - DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>) - sideOutputPorts[i]; - sideOutputPortMapping.put(sideOutputTags.get(i), port); - } - - DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault( - pipelineOptions.get(), - doFn, - sideInputReader, - this, - mainOutputTag, - sideOutputTags, - new NoOpStepContext(), - new NoOpAggregatorFactory(), - windowingStrategy - ); - - pushbackDoFnRunner = - PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); - - try { - doFn.setup(); - } catch (Exception e) { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } - - } - - @Override - public void beginWindow(long windowId) { - } - - @Override - public void endWindow() { - } - - /** - * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode. - * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}. - */ - public static class NoOpAggregatorFactory implements AggregatorFactory { - - private NoOpAggregatorFactory() { - } - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, ExecutionContext.StepContext step, - String name, CombineFn<InputT, AccumT, OutputT> combine) { - return new NoOpAggregator<InputT, OutputT>(); - } - - private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, - java.io.Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void addValue(InputT value) { - } - - @Override - public String getName() { - // TODO Auto-generated method stub - return null; - } - - @Override - public CombineFn<InputT, ?, OutputT> getCombineFn() { - // TODO Auto-generated method stub - return null; - } - - }; - } - - private static class LongMin { - long state = Long.MAX_VALUE; - - public void add(long l) { - state = Math.min(state, l); - } - - public long get() { - return state; - } - - public void clear() { - state = Long.MAX_VALUE; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java deleted file mode 100644 index ecb0adb..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of the Beam runner for Apache Apex. - */ -package org.apache.beam.runners.apex.translators.functions; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java deleted file mode 100644 index 61236ca..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators.io; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Throwables; - -import java.io.IOException; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; -import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple; -import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Apex input operator that wraps Beam {@link UnboundedSource}. - */ -public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT - extends UnboundedSource.CheckpointMark> implements InputOperator { - private static final Logger LOG = LoggerFactory.getLogger( - ApexReadUnboundedInputOperator.class); - private boolean traceTuples = false; - private long outputWatermark = 0; - - @Bind(JavaSerializer.class) - private final SerializablePipelineOptions pipelineOptions; - @Bind(JavaSerializer.class) - private final UnboundedSource<OutputT, CheckpointMarkT> source; - private final boolean isBoundedSource; - private transient UnboundedSource.UnboundedReader<OutputT> reader; - private transient boolean available = false; - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = - new DefaultOutputPort<>(); - - public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, - ApexPipelineOptions options) { - this.pipelineOptions = new SerializablePipelineOptions(options); - this.source = source; - this.isBoundedSource = false; - } - - public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, - boolean isBoundedSource, ApexPipelineOptions options) { - this.pipelineOptions = new SerializablePipelineOptions(options); - this.source = source; - this.isBoundedSource = isBoundedSource; - } - - @SuppressWarnings("unused") // for Kryo - private ApexReadUnboundedInputOperator() { - this.pipelineOptions = null; this.source = null; this.isBoundedSource = false; - } - - @Override - public void beginWindow(long windowId) { - if (!available && (isBoundedSource || source instanceof ValuesSource)) { - // if it's a Create and the input was consumed, emit final watermark - emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()); - // terminate the stream (allows tests to finish faster) - BaseOperator.shutdown(); - } else { - emitWatermarkIfNecessary(reader.getWatermark().getMillis()); - } - } - - private void emitWatermarkIfNecessary(long mark) { - if (mark > outputWatermark) { - outputWatermark = mark; - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", mark); - } - output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark)); - } - } - - @Override - public void endWindow() { - } - - @Override - public void setup(OperatorContext context) { - this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); - try { - reader = source.createReader(this.pipelineOptions.get(), null); - available = reader.start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void teardown() { - try { - if (reader != null) { - reader.close(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void emitTuples() { - try { - if (!available) { - available = reader.advance(); - } - if (available) { - OutputT data = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); - available = reader.advance(); - if (traceTuples) { - LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp); - } - output.emit(DataTuple.of(WindowedValue.of( - data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); - } - } catch (Exception e) { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java deleted file mode 100644 index fadf8ec4..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.translators.io; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.joda.time.Instant; - -/** - * Unbounded source that reads from a Java {@link Iterable}. - */ -public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { - private static final long serialVersionUID = 1L; - - private final byte[] codedValues; - private final IterableCoder<T> iterableCoder; - - public ValuesSource(Iterable<T> values, Coder<T> coder) { - this.iterableCoder = IterableCoder.of(coder); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - iterableCoder.encode(values, bos, Context.OUTER); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - this.codedValues = bos.toByteArray(); - } - - @Override - public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - return Collections.singletonList(this); - } - - @Override - public UnboundedReader<T> createReader(PipelineOptions options, - @Nullable CheckpointMark checkpointMark) { - ByteArrayInputStream bis = new ByteArrayInputStream(codedValues); - try { - Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER); - return new ValuesReader<>(values, this); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - @Nullable - @Override - public Coder<CheckpointMark> getCheckpointMarkCoder() { - return null; - } - - @Override - public void validate() { - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return iterableCoder.getElemCoder(); - } - - private static class ValuesReader<T> extends UnboundedReader<T> { - - private final Iterable<T> values; - private final UnboundedSource<T, CheckpointMark> source; - private transient Iterator<T> iterator; - private T current; - - public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) { - this.values = values; - this.source = source; - } - - @Override - public boolean start() throws IOException { - if (null == iterator) { - iterator = values.iterator(); - } - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (iterator.hasNext()) { - current = iterator.next(); - return true; - } else { - return false; - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - return current; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return Instant.now(); - } - - @Override - public void close() throws IOException { - } - - @Override - public Instant getWatermark() { - return Instant.now(); - } - - @Override - public CheckpointMark getCheckpointMark() { - return null; - } - - @Override - public UnboundedSource<T, ?> getCurrentSource() { - return source; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java deleted file mode 100644 index 0d17f19..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of the Beam runner for Apache Apex. - */ -package org.apache.beam.runners.apex.translators.io; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java deleted file mode 100644 index 7d7c6cc..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of the Beam runner for Apache Apex. - */ -package org.apache.beam.runners.apex.translators; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java deleted file mode 100644 index edc1220..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java +++ /dev/null @@ -1,438 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import com.esotericsoftware.kryo.DefaultSerializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.Table; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; -import org.apache.beam.sdk.coders.InstantCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; -import org.apache.beam.sdk.util.state.ValueState; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.joda.time.Instant; - -/** - * Implementation of {@link StateInternals} that can be serialized and - * checkpointed with the operator. Suitable for small states, in the future this - * should be based on the incremental state saving components in the Apex - * library. - */ -@DefaultSerializer(JavaSerializer.class) -public class ApexStateInternals<K> implements StateInternals<K>, Serializable { - private static final long serialVersionUID = 1L; - public static <K> ApexStateInternals<K> forKey(K key) { - return new ApexStateInternals<>(key); - } - - private final K key; - - protected ApexStateInternals(K key) { - this.key = key; - } - - @Override - public K getKey() { - return key; - } - - /** - * Serializable state for internals (namespace -> state tag -> coded value). - */ - private final Table<String, String, byte[]> stateTable = HashBasedTable.create(); - - @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { - return state(namespace, address, StateContexts.nullContext()); - } - - @Override - public <T extends State> T state( - StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) { - return address.bind(new ApexStateBinder(key, namespace, address, c)); - } - - /** - * A {@link StateBinder} that returns {@link State} wrappers for serialized state. - */ - private class ApexStateBinder implements StateBinder<K> { - private final K key; - private final StateNamespace namespace; - private final StateContext<?> c; - - private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address, - StateContext<?> c) { - this.key = key; - this.namespace = namespace; - this.c = c; - } - - @Override - public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { - return new ApexValueState<T>(namespace, address, coder); - } - - @Override - public <T> BagState<T> bindBag( - final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { - return new ApexBagState<T>(namespace, address, elemCoder); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - final CombineFn<InputT, AccumT, OutputT> combineFn) { - return new ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>( - namespace, - address, - accumCoder, - key, - combineFn.<K>asKeyedFn() - ); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - return new ApexWatermarkHoldState<W>(namespace, address, outputTimeFn); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>( - namespace, - address, - accumCoder, - key, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); - } - } - - private class AbstractState<T> { - protected final StateNamespace namespace; - protected final StateTag<?, ? extends State> address; - protected final Coder<T> coder; - - private AbstractState( - StateNamespace namespace, - StateTag<?, ? extends State> address, - Coder<T> coder) { - this.namespace = namespace; - this.address = address; - this.coder = coder; - } - - protected T readValue() { - T value = null; - byte[] buf = stateTable.get(namespace.stringKey(), address.getId()); - if (buf != null) { - // TODO: reuse input - Input input = new Input(buf); - try { - return coder.decode(input, Context.OUTER); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return value; - } - - public void writeValue(T input) { - ByteArrayOutputStream output = new ByteArrayOutputStream(); - try { - coder.encode(input, output, Context.OUTER); - stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public void clear() { - stateTable.remove(namespace.stringKey(), address.getId()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - @SuppressWarnings("unchecked") - AbstractState<?> that = (AbstractState<?>) o; - return namespace.equals(that.namespace) && address.equals(that.address); - } - - @Override - public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + address.hashCode(); - return result; - } - } - - private class ApexValueState<T> extends AbstractState<T> implements ValueState<T> { - - private ApexValueState( - StateNamespace namespace, - StateTag<?, ValueState<T>> address, - Coder<T> coder) { - super(namespace, address, coder); - } - - @Override - public ApexValueState<T> readLater() { - return this; - } - - @Override - public T read() { - return readValue(); - } - - @Override - public void write(T input) { - writeValue(input); - } - } - - private final class ApexWatermarkHoldState<W extends BoundedWindow> - extends AbstractState<Instant> implements WatermarkHoldState<W> { - - private final OutputTimeFn<? super W> outputTimeFn; - - public ApexWatermarkHoldState( - StateNamespace namespace, - StateTag<?, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - super(namespace, address, InstantCoder.of()); - this.outputTimeFn = outputTimeFn; - } - - @Override - public ApexWatermarkHoldState<W> readLater() { - return this; - } - - @Override - public Instant read() { - return readValue(); - } - - @Override - public void add(Instant outputTime) { - Instant combined = read(); - combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime); - writeValue(combined); - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - return this; - } - @Override - public Boolean read() { - return stateTable.get(namespace.stringKey(), address.getId()) == null; - } - }; - } - - @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; - } - - } - - private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> - extends AbstractState<AccumT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT> { - private final K key; - private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; - - private ApexAccumulatorCombiningState(StateNamespace namespace, - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> coder, - K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - super(namespace, address, coder); - this.key = key; - this.combineFn = combineFn; - } - - @Override - public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public OutputT read() { - return combineFn.extractOutput(key, getAccum()); - } - - @Override - public void add(InputT input) { - AccumT accum = getAccum(); - combineFn.addInput(key, accum, input); - writeValue(accum); - } - - @Override - public AccumT getAccum() { - AccumT accum = readValue(); - if (accum == null) { - accum = combineFn.createAccumulator(key); - } - return accum; - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - return this; - } - @Override - public Boolean read() { - return stateTable.get(namespace.stringKey(), address.getId()) == null; - } - }; - } - - @Override - public void addAccum(AccumT accum) { - accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum)); - writeValue(accum); - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(key, accumulators); - } - - } - - private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> { - private ApexBagState( - StateNamespace namespace, - StateTag<?, BagState<T>> address, - Coder<T> coder) { - super(namespace, address, ListCoder.of(coder)); - } - - @Override - public ApexBagState<T> readLater() { - return this; - } - - @Override - public List<T> read() { - List<T> value = super.readValue(); - if (value == null) { - value = new ArrayList<T>(); - } - return value; - } - - @Override - public void add(T input) { - List<T> value = read(); - value.add(input); - writeValue(value); - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - return this; - } - - @Override - public Boolean read() { - return stateTable.get(namespace.stringKey(), address.getId()) == null; - } - }; - } - } - - /** - * Factory for {@link ApexStateInternals}. - * - * @param <K> - */ - public static class ApexStateInternalsFactory<K> - implements StateInternalsFactory<K>, Serializable { - private static final long serialVersionUID = 1L; - - @Override - public StateInternals<K> stateInternalsForKey(K key) { - return ApexStateInternals.forKey(key); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java deleted file mode 100644 index 25518dc..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.datatorrent.api.Operator; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; - -/** - * The common interface for all objects transmitted through streams. - * - * @param <T> The actual payload type. - */ -public interface ApexStreamTuple<T> { - /** - * Gets the value of the tuple. - * - * @return - */ - T getValue(); - - /** - * Data tuple class. - * - * @param <T> - */ - class DataTuple<T> implements ApexStreamTuple<T> { - private int unionTag; - private T value; - - public static <T> DataTuple<T> of(T value) { - return new DataTuple<>(value, 0); - } - - private DataTuple(T value, int unionTag) { - this.value = value; - this.unionTag = unionTag; - } - - @Override - public T getValue() { - return value; - } - - public void setValue(T value) { - this.value = value; - } - - public int getUnionTag() { - return unionTag; - } - - public void setUnionTag(int unionTag) { - this.unionTag = unionTag; - } - - @Override - public String toString() { - return value.toString(); - } - - } - - /** - * Tuple that includes a timestamp. - * - * @param <T> - */ - class TimestampedTuple<T> extends DataTuple<T> { - private long timestamp; - - public TimestampedTuple(long timestamp, T value) { - super(value, 0); - this.timestamp = timestamp; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - @Override - public int hashCode() { - return Objects.hash(timestamp); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof TimestampedTuple)) { - return false; - } else { - TimestampedTuple<?> other = (TimestampedTuple<?>) obj; - return (timestamp == other.timestamp) && Objects.equals(this.getValue(), other.getValue()); - } - } - - } - - /** - * Tuple that represents a watermark. - * - * @param <T> - */ - class WatermarkTuple<T> extends TimestampedTuple<T> { - public static <T> WatermarkTuple<T> of(long timestamp) { - return new WatermarkTuple<>(timestamp); - } - - protected WatermarkTuple(long timestamp) { - super(timestamp, null); - } - - @Override - public String toString() { - return "[Watermark " + getTimestamp() + "]"; - } - } - - /** - * Coder for {@link ApexStreamTuple}. - */ - class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> { - private static final long serialVersionUID = 1L; - final Coder<T> valueCoder; - - public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) { - return new ApexStreamTupleCoder<>(valueCoder); - } - - protected ApexStreamTupleCoder(Coder<T> valueCoder) { - this.valueCoder = checkNotNull(valueCoder); - } - - @Override - public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context) - throws CoderException, IOException { - if (value instanceof WatermarkTuple) { - outStream.write(1); - new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp()); - } else { - outStream.write(0); - outStream.write(((DataTuple<?>) value).unionTag); - valueCoder.encode(value.getValue(), outStream, context); - } - } - - @Override - public ApexStreamTuple<T> decode(InputStream inStream, Context context) - throws CoderException, IOException { - int b = inStream.read(); - if (b == 1) { - return new WatermarkTuple<>(new DataInputStream(inStream).readLong()); - } else { - int unionTag = inStream.read(); - return new DataTuple<>(valueCoder.decode(inStream, context), unionTag); - } - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return Arrays.<Coder<?>>asList(valueCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - verifyDeterministic( - this.getClass().getSimpleName() + " requires a deterministic valueCoder", - valueCoder); - } - - /** - * Returns the value coder. - */ - public Coder<T> getValueCoder() { - return valueCoder; - } - - } - - /** - * Central if data tuples received on and emitted from ports should be logged. - * Should be called in setup and value cached in operator. - */ - final class Logging { - public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) { - return options.isTupleTracingEnabled(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java deleted file mode 100644 index 61e3b83..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import com.datatorrent.api.StreamCodec; -import com.datatorrent.netlet.util.Slice; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; - -/** - * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}. - */ -public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable { - private static final long serialVersionUID = 1L; - private final Coder<? super Object> coder; - - public CoderAdapterStreamCodec(Coder<? super Object> coder) { - this.coder = coder; - } - - @Override - public Object fromByteArray(Slice fragment) { - ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, - fragment.length); - try { - return coder.decode(bis, Context.OUTER); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Slice toByteArray(Object wv) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - coder.encode(wv, bos, Context.OUTER); - } catch (IOException e) { - throw new RuntimeException(e); - } - return new Slice(bos.toByteArray()); - } - - @Override - public int getPartition(Object o) { - return o.hashCode(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java deleted file mode 100644 index 3b19c37..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import java.io.IOException; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExecutionContext; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Serializable {@link ExecutionContext.StepContext} that does nothing. - */ -public class NoOpStepContext implements ExecutionContext.StepContext, Serializable { - private static final long serialVersionUID = 1L; - - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - - @Override - public void noteOutput(WindowedValue<?> output) { - } - - @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { - } - - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws - IOException { - - } - - @Override - public StateInternals<?> stateInternals() { - return null; - } - - @Override - public TimerInternals timerInternals() { - return null; - } -}