http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java index 247fe25..9bf4eb4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java @@ -1,41 +1,41 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - + */ + +package org.apache.flink.streaming.runtime.io; + import java.util.concurrent.BlockingQueue; import org.apache.flink.runtime.iterative.concurrent.Broker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -@SuppressWarnings("rawtypes") -public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> { - /** - * Singleton instance - */ - private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker(); - - private BlockingQueueBroker() { - } - - /** - * retrieve singleton instance - */ - public static Broker<BlockingQueue<StreamRecord>> instance() { - return INSTANCE; - } -} + +@SuppressWarnings("rawtypes") +public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> { + /** + * Singleton instance + */ + private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker(); + + private BlockingQueueBroker() { + } + + /** + * retrieve singleton instance + */ + public static Broker<BlockingQueue<StreamRecord>> instance() { + return INSTANCE; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java deleted file mode 100644 index 4358810..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.plugable.DeserializationDelegate; -import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate; - -/** - * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two - * input types. - */ -public class CoReaderIterator<T1, T2> { - - private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the - // source - - protected final ReusingDeserializationDelegate<T1> delegate1; - protected final ReusingDeserializationDelegate<T2> delegate2; - - public CoReaderIterator( - CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader, - TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) { - this.reader = reader; - this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1); - this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2); - } - - public int next(T1 target1, T2 target2) throws IOException { - this.delegate1.setInstance(target1); - this.delegate2.setInstance(target2); - - try { - return this.reader.getNextRecord(this.delegate1, this.delegate2); - - } catch (InterruptedException e) { - throw new IOException("Reader interrupted.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java deleted file mode 100644 index a7139b6..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.concurrent.LinkedBlockingDeque; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; -import org.apache.flink.runtime.io.network.api.reader.AbstractReader; -import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; -import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import org.apache.flink.runtime.util.event.EventListener; -import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep; - -/** - * A CoRecordReader wraps {@link MutableRecordReader}s of two different input - * types to read records effectively. - */ -@SuppressWarnings("rawtypes") -public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends - AbstractReader implements EventListener<InputGate>, StreamingReader { - - private final InputGate bufferReader1; - - private final InputGate bufferReader2; - - private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>(); - - private LinkedList<Integer> processed = new LinkedList<Integer>(); - - private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers; - - private RecordDeserializer<T1> reader1currentRecordDeserializer; - - private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers; - - private RecordDeserializer<T2> reader2currentRecordDeserializer; - - // 0 => none, 1 => reader (T1), 2 => reader (T2) - private int currentReaderIndex; - - private boolean hasRequestedPartitions; - - protected CoBarrierBuffer barrierBuffer1; - protected CoBarrierBuffer barrierBuffer2; - - public CoRecordReader(InputGate inputgate1, InputGate inputgate2) { - super(new UnionInputGate(inputgate1, inputgate2)); - - this.bufferReader1 = inputgate1; - this.bufferReader2 = inputgate2; - - this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1 - .getNumberOfInputChannels()]; - this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2 - .getNumberOfInputChannels()]; - - for (int i = 0; i < reader1RecordDeserializers.length; i++) { - reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>(); - } - - for (int i = 0; i < reader2RecordDeserializers.length; i++) { - reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>(); - } - - inputgate1.registerListener(this); - inputgate2.registerListener(this); - - barrierBuffer1 = new CoBarrierBuffer(inputgate1, this); - barrierBuffer2 = new CoBarrierBuffer(inputgate2, this); - - barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2); - barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1); - } - - public void requestPartitionsOnce() throws IOException, InterruptedException { - if (!hasRequestedPartitions) { - bufferReader1.requestPartitions(); - bufferReader2.requestPartitions(); - - hasRequestedPartitions = true; - } - } - - @SuppressWarnings("unchecked") - protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException { - - requestPartitionsOnce(); - - while (true) { - if (currentReaderIndex == 0) { - if ((bufferReader1.isFinished() && bufferReader2.isFinished())) { - return 0; - } - - currentReaderIndex = getNextReaderIndexBlocking(); - - } - - if (currentReaderIndex == 1) { - while (true) { - if (reader1currentRecordDeserializer != null) { - RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer - .getNextRecord(target1); - - if (result.isBufferConsumed()) { - reader1currentRecordDeserializer.getCurrentBuffer().recycle(); - reader1currentRecordDeserializer = null; - - currentReaderIndex = 0; - } - - if (result.isFullRecord()) { - return 1; - } - } else { - - final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked(); - - if (boe.isBuffer()) { - reader1currentRecordDeserializer = reader1RecordDeserializers[boe - .getChannelIndex()]; - reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer()); - } else if (boe.getEvent() instanceof StreamingSuperstep) { - barrierBuffer1.processSuperstep(boe); - currentReaderIndex = 0; - - break; - } else if (handleEvent(boe.getEvent())) { - currentReaderIndex = 0; - - break; - } - } - } - } else if (currentReaderIndex == 2) { - while (true) { - if (reader2currentRecordDeserializer != null) { - RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer - .getNextRecord(target2); - - if (result.isBufferConsumed()) { - reader2currentRecordDeserializer.getCurrentBuffer().recycle(); - reader2currentRecordDeserializer = null; - - currentReaderIndex = 0; - } - - if (result.isFullRecord()) { - return 2; - } - } else { - final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked(); - - if (boe.isBuffer()) { - reader2currentRecordDeserializer = reader2RecordDeserializers[boe - .getChannelIndex()]; - reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer()); - } else if (boe.getEvent() instanceof StreamingSuperstep) { - barrierBuffer2.processSuperstep(boe); - currentReaderIndex = 0; - - break; - } else if (handleEvent(boe.getEvent())) { - currentReaderIndex = 0; - - break; - } - } - } - } else { - throw new IllegalStateException("Bug: unexpected current reader index."); - } - } - } - - protected int getNextReaderIndexBlocking() throws InterruptedException { - - Integer nextIndex = 0; - - while (processed.contains(nextIndex = availableRecordReaders.take())) { - processed.remove(nextIndex); - } - - if (nextIndex == 1) { - if (barrierBuffer1.isAllBlocked()) { - availableRecordReaders.addFirst(1); - processed.add(2); - return 2; - } else { - return 1; - } - } else { - if (barrierBuffer2.isAllBlocked()) { - availableRecordReaders.addFirst(2); - processed.add(1); - return 1; - } else { - return 2; - } - - } - - } - - // ------------------------------------------------------------------------ - // Data availability notifications - // ------------------------------------------------------------------------ - - @Override - public void onEvent(InputGate bufferReader) { - addToAvailable(bufferReader); - } - - protected void addToAvailable(InputGate bufferReader) { - if (bufferReader == bufferReader1) { - availableRecordReaders.add(1); - } else if (bufferReader == bufferReader2) { - availableRecordReaders.add(2); - } - } - - public void clearBuffers() { - for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) { - Buffer buffer = deserializer.getCurrentBuffer(); - if (buffer != null && !buffer.isRecycled()) { - buffer.recycle(); - } - } - for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) { - Buffer buffer = deserializer.getCurrentBuffer(); - if (buffer != null && !buffer.isRecycled()) { - buffer.recycle(); - } - } - } - - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - for (AdaptiveSpanningRecordDeserializer serializer : reader1RecordDeserializers) { - serializer.setReporter(reporter); - } - for (AdaptiveSpanningRecordDeserializer serializer : reader2RecordDeserializers) { - serializer.setReporter(reporter); - } - } - - private class CoBarrierBuffer extends BarrierBuffer { - - private CoBarrierBuffer otherBuffer; - - public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) { - super(inputGate, reader); - } - - public void setOtherBarrierBuffer(CoBarrierBuffer other) { - this.otherBuffer = other; - } - - @Override - protected void actOnAllBlocked() { - if (otherBuffer.isAllBlocked()) { - super.actOnAllBlocked(); - otherBuffer.releaseBlocks(); - } - } - - } - - public void cleanup() throws IOException { - try { - barrierBuffer1.cleanup(); - } finally { - barrierBuffer2.cleanup(); - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java new file mode 100644 index 0000000..2f9d1d6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; + +public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> { + + private OutputSelectorWrapper<OUT> outputSelectorWrapper; + + private List<Output<OUT>> allOutputs; + + public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) { + this.outputSelectorWrapper = outputSelectorWrapper; + allOutputs = new ArrayList<Output<OUT>>(); + } + + public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) { + outputSelectorWrapper.addCollector(output, edge); + allOutputs.add((Output) output); + } + + @Override + public void collect(StreamRecord<OUT> record) { + for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue())) { + output.collect(record); + } + } + + @Override + public void emitWatermark(Watermark mark) { + for (Output<OUT> output : allOutputs) { + output.emitWatermark(mark); + } + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java deleted file mode 100644 index 7f2a9c5..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; - -public class IndexedMutableReader<T extends IOReadableWritable> extends - StreamingMutableRecordReader<T> { - - InputGate reader; - - public IndexedMutableReader(InputGate reader) { - super(reader); - this.reader = reader; - } - - public int getNumberOfInputChannels() { - return reader.getNumberOfInputChannels(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java deleted file mode 100644 index 2050e27..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.operators.util.ReaderIterator; -import org.apache.flink.runtime.plugable.DeserializationDelegate; - -public class IndexedReaderIterator<T> extends ReaderIterator<T> { - - public IndexedReaderIterator( - IndexedMutableReader<DeserializationDelegate<T>> reader, - TypeSerializer<T> serializer) { - - super(reader, serializer); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java deleted file mode 100644 index 7883251..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import java.util.Collection; - -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; - -public class InputGateFactory { - - public static InputGate createInputGate(Collection<InputGate> inputGates) { - return createInputGate(inputGates.toArray(new InputGate[inputGates.size()])); - } - - public static InputGate createInputGate(InputGate[] inputGates) { - if (inputGates.length <= 0) { - throw new RuntimeException("No such input gate."); - } - - if (inputGates.length < 2) { - return inputGates[0]; - } else { - return new UnionInputGate(inputGates); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java new file mode 100644 index 0000000..01e16fb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; + +/** + * Utility for dealing with input gates. This will either just return + * the single {@link InputGate} that was passed in or create a {@link UnionInputGate} if several + * {@link InputGate input gates} are given. + */ +public class InputGateUtil { + + public static InputGate createInputGate(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2) { + List<InputGate> gates = new ArrayList<InputGate>(inputGates1.size() + inputGates2.size()); + gates.addAll(inputGates1); + gates.addAll(inputGates2); + return createInputGate(gates.toArray(new InputGate[gates.size()])); + } + + public static InputGate createInputGate(InputGate[] inputGates) { + if (inputGates.length <= 0) { + throw new RuntimeException("No such input gate."); + } + + if (inputGates.length < 2) { + return inputGates[0]; + } else { + return new UnionInputGate(inputGates); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java new file mode 100644 index 0000000..e9cbb7d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link Output} that sends data using a {@link RecordWriter}. + */ +public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { + + private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class); + + private RecordWriter<SerializationDelegate> recordWriter; + private SerializationDelegate serializationDelegate; + + @SuppressWarnings("unchecked") + public RecordWriterOutput( + RecordWriter<SerializationDelegate> recordWriter, + TypeSerializer<OUT> outSerializer, + boolean enableWatermarkMultiplexing) { + Preconditions.checkNotNull(recordWriter); + + this.recordWriter = recordWriter; + + StreamRecordSerializer<OUT> outRecordSerializer; + if (enableWatermarkMultiplexing) { + outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer); + } else { + outRecordSerializer = new StreamRecordSerializer<OUT>(outSerializer); + } + + if (outSerializer != null) { + serializationDelegate = new SerializationDelegate(outRecordSerializer); + } + } + + @Override + @SuppressWarnings("unchecked") + public void collect(StreamRecord<OUT> record) { + serializationDelegate.setInstance(record); + + try { + recordWriter.emit(serializationDelegate); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Emit failed: {}", e); + } + throw new RuntimeException("Element emission failed.", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public void emitWatermark(Watermark mark) { + serializationDelegate.setInstance(mark); + try { + recordWriter.broadcastEmit(serializationDelegate); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Watermark emit failed: {}", e); + } + throw new RuntimeException(e); + } + } + + @Override + public void close() { + if (recordWriter instanceof StreamRecordWriter) { + ((StreamRecordWriter) recordWriter).close(); + } else { + try { + recordWriter.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void clearBuffers() { + recordWriter.clearBuffers(); + } + + public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException { + recordWriter.broadcastEvent(barrier); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java new file mode 100644 index 0000000..e665710 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import java.io.IOException; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.io.network.api.reader.AbstractReader; +import org.apache.flink.runtime.io.network.api.reader.ReaderBase; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. + * + * <p> + * This also keeps track of {@link Watermark} events and forwards them to event subscribers + * once the {@link Watermark} from all inputs advances. + * + * @param <IN> The type of the record that can be read with this record reader. + */ +public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBase, StreamingReader { + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class); + + private final RecordDeserializer<DeserializationDelegate>[] recordDeserializers; + + private RecordDeserializer<DeserializationDelegate> currentRecordDeserializer; + + // We need to keep track of the channel from which a buffer came, so that we can + // appropriately map the watermarks to input channels + int currentChannel = -1; + + private boolean isFinished; + + private final BarrierBuffer barrierBuffer; + + private long[] watermarks; + private long lastEmittedWatermark; + + private DeserializationDelegate deserializationDelegate; + + @SuppressWarnings("unchecked") + public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) { + super(InputGateUtil.createInputGate(inputGates)); + + barrierBuffer = new BarrierBuffer(inputGate, this); + + StreamRecordSerializer<IN> inputRecordSerializer; + if (enableWatermarkMultiplexing) { + inputRecordSerializer = new MultiplexingStreamRecordSerializer<IN>(inputSerializer); + } else { + inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer); + } + this.deserializationDelegate = new NonReusingDeserializationDelegate(inputRecordSerializer); + + // Initialize one deserializer per input channel + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; + for (int i = 0; i < recordDeserializers.length; i++) { + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>(); + } + + watermarks = new long[inputGate.getNumberOfInputChannels()]; + for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) { + watermarks[i] = Long.MIN_VALUE; + } + lastEmittedWatermark = Long.MIN_VALUE; + } + + @SuppressWarnings("unchecked") + public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception { + if (isFinished) { + return false; + } + + while (true) { + if (currentRecordDeserializer != null) { + DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); + + if (result.isBufferConsumed()) { + currentRecordDeserializer.getCurrentBuffer().recycle(); + currentRecordDeserializer = null; + } + + if (result.isFullRecord()) { + Object recordOrWatermark = deserializationDelegate.getInstance(); + + if (recordOrWatermark instanceof Watermark) { + Watermark mark = (Watermark) recordOrWatermark; + long watermarkMillis = mark.getTimestamp(); + if (watermarkMillis > watermarks[currentChannel]) { + watermarks[currentChannel] = watermarkMillis; + long newMinWatermark = Long.MAX_VALUE; + for (long watermark : watermarks) { + if (watermark < newMinWatermark) { + newMinWatermark = watermark; + } + } + if (newMinWatermark > lastEmittedWatermark) { + lastEmittedWatermark = newMinWatermark; + streamOperator.processWatermark(new Watermark(lastEmittedWatermark)); + } + } + continue; + } else { + // now we can do the actual processing + StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance(); + StreamingRuntimeContext ctx = streamOperator.getRuntimeContext(); + if (ctx != null) { + ctx.setNextInput(record); + } + streamOperator.processElement(record); + return true; + } + } + } + + final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); + + if (bufferOrEvent.isBuffer()) { + currentChannel = bufferOrEvent.getChannelIndex(); + currentRecordDeserializer = recordDeserializers[currentChannel]; + currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); + } else { + // Event received + final AbstractEvent event = bufferOrEvent.getEvent(); + + if (event instanceof CheckpointBarrier) { + barrierBuffer.processBarrier(bufferOrEvent); + } else { + if (handleEvent(event)) { + if (inputGate.isFinished()) { + if (!barrierBuffer.isEmpty()) { + throw new RuntimeException("BarrierBuffer should be empty at this point"); + } + isFinished = true; + return false; + } else if (hasReachedEndOfSuperstep()) { + return false; + } // else: More data is coming... + } + } + } + } + } + + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + deserializer.setReporter(reporter); + } + } + + public void clearBuffers() { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + Buffer buffer = deserializer.getCurrentBuffer(); + if (buffer != null && !buffer.isRecycled()) { + buffer.recycle(); + } + } + } + + public void cleanup() throws IOException { + barrierBuffer.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java index c212346..abae9a4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java @@ -61,6 +61,14 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit } } + @Override + public void broadcastEmit(T record) throws IOException, InterruptedException { + super.broadcastEmit(record); + if (flushAlways) { + flush(); + } + } + public void close() { try { if (outputFlusher != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java new file mode 100644 index 0000000..1fe98bb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.io.network.api.reader.AbstractReader; +import org.apache.flink.runtime.io.network.api.reader.ReaderBase; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +/** + * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. + * + * <p> + * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers + * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances. + * + * @param <IN1> The type of the records that arrive on the first input + * @param <IN2> The type of the records that arrive on the second input + */ +public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements ReaderBase, StreamingReader { + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class); + + private final RecordDeserializer[] recordDeserializers; + + private RecordDeserializer currentRecordDeserializer; + + // We need to keep track of the channel from which a buffer came, so that we can + // appropriately map the watermarks to input channels + int currentChannel = -1; + + private boolean isFinished; + + private final BarrierBuffer barrierBuffer; + + private long[] watermarks1; + private long lastEmittedWatermark1; + + private long[] watermarks2; + private long lastEmittedWatermark2; + + private int numInputChannels1; + private int numInputChannels2; + + private DeserializationDelegate deserializationDelegate1; + private DeserializationDelegate deserializationDelegate2; + + @SuppressWarnings("unchecked") + public StreamTwoInputProcessor( + Collection<InputGate> inputGates1, + Collection<InputGate> inputGates2, + TypeSerializer<IN1> inputSerializer1, + TypeSerializer<IN2> inputSerializer2, + boolean enableWatermarkMultiplexing) { + super(InputGateUtil.createInputGate(inputGates1, inputGates2)); + + barrierBuffer = new BarrierBuffer(inputGate, this); + + StreamRecordSerializer<IN1> inputRecordSerializer1; + if (enableWatermarkMultiplexing) { + inputRecordSerializer1 = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1); + } else { + inputRecordSerializer1 = new StreamRecordSerializer<IN1>(inputSerializer1); + } + this.deserializationDelegate1 = new NonReusingDeserializationDelegate(inputRecordSerializer1); + + StreamRecordSerializer<IN2> inputRecordSerializer2; + if (enableWatermarkMultiplexing) { + inputRecordSerializer2 = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2); + } else { + inputRecordSerializer2 = new StreamRecordSerializer<IN2>(inputSerializer2); + } + this.deserializationDelegate2 = new NonReusingDeserializationDelegate(inputRecordSerializer2); + + // Initialize one deserializer per input channel + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate + .getNumberOfInputChannels()]; + for (int i = 0; i < recordDeserializers.length; i++) { + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(); + } + + // determine which unioned channels belong to input 1 and which belong to input 2 + numInputChannels1 = 0; + for (InputGate gate: inputGates1) { + numInputChannels1 += gate.getNumberOfInputChannels(); + } + numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1; + + watermarks1 = new long[numInputChannels1]; + for (int i = 0; i < numInputChannels1; i++) { + watermarks1[i] = Long.MIN_VALUE; + } + lastEmittedWatermark1 = Long.MIN_VALUE; + + watermarks2 = new long[numInputChannels2]; + for (int i = 0; i < numInputChannels2; i++) { + watermarks2[i] = Long.MIN_VALUE; + } + lastEmittedWatermark2 = Long.MIN_VALUE; + } + + @SuppressWarnings("unchecked") + public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws Exception { + if (isFinished) { + return false; + } + + while (true) { + if (currentRecordDeserializer != null) { + DeserializationResult result; + if (currentChannel < numInputChannels1) { + result = currentRecordDeserializer.getNextRecord(deserializationDelegate1); + } else { + result = currentRecordDeserializer.getNextRecord(deserializationDelegate2); + } + + if (result.isBufferConsumed()) { + currentRecordDeserializer.getCurrentBuffer().recycle(); + currentRecordDeserializer = null; + } + + if (result.isFullRecord()) { + if (currentChannel < numInputChannels1) { + Object recordOrWatermark = deserializationDelegate1.getInstance(); + if (recordOrWatermark instanceof Watermark) { + handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel); + continue; + } else { + streamOperator.processElement1((StreamRecord<IN1>) deserializationDelegate1.getInstance()); + return true; + + } + } else { + Object recordOrWatermark = deserializationDelegate2.getInstance(); + if (recordOrWatermark instanceof Watermark) { + handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel); + continue; + } else { + streamOperator.processElement2((StreamRecord<IN2>) deserializationDelegate2.getInstance()); + return true; + } + } + } + } + + final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); + + if (bufferOrEvent.isBuffer()) { + currentChannel = bufferOrEvent.getChannelIndex(); + currentRecordDeserializer = recordDeserializers[currentChannel]; + currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); + + } else { + // Event received + final AbstractEvent event = bufferOrEvent.getEvent(); + + if (event instanceof CheckpointBarrier) { + barrierBuffer.processBarrier(bufferOrEvent); + } else { + if (handleEvent(event)) { + if (inputGate.isFinished()) { + if (!barrierBuffer.isEmpty()) { + throw new RuntimeException("BarrierBuffer should be empty at this point"); + } + isFinished = true; + return false; + } else if (hasReachedEndOfSuperstep()) { + return false; + } // else: More data is coming... + } + } + } + } + } + + private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex) throws Exception { + if (channelIndex < numInputChannels1) { + long watermarkMillis = mark.getTimestamp(); + if (watermarkMillis > watermarks1[channelIndex]) { + watermarks1[channelIndex] = watermarkMillis; + long newMinWatermark = Long.MAX_VALUE; + for (long aWatermarks1 : watermarks1) { + if (aWatermarks1 < newMinWatermark) { + newMinWatermark = aWatermarks1; + } + } + if (newMinWatermark > lastEmittedWatermark1) { + lastEmittedWatermark1 = newMinWatermark; + operator.processWatermark1(new Watermark(lastEmittedWatermark1)); + } + } + } else { + channelIndex = channelIndex - numInputChannels1; + long watermarkMillis = mark.getTimestamp(); + if (watermarkMillis > watermarks2[channelIndex]) { + watermarks2[channelIndex] = watermarkMillis; + long newMinWatermark = Long.MAX_VALUE; + for (long aWatermarks2 : watermarks2) { + if (aWatermarks2 < newMinWatermark) { + newMinWatermark = aWatermarks2; + } + } + if (newMinWatermark > lastEmittedWatermark2) { + lastEmittedWatermark2 = newMinWatermark; + operator.processWatermark2(new Watermark(lastEmittedWatermark2)); + } + } + } + + } + + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + deserializer.setReporter(reporter); + } + } + + public void clearBuffers() { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + Buffer buffer = deserializer.getCurrentBuffer(); + if (buffer != null && !buffer.isRecycled()) { + buffer.recycle(); + } + } + } + + public void cleanup() throws IOException { + barrierBuffer.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java deleted file mode 100644 index 44f9a86..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.io.network.api.reader.AbstractReader; -import org.apache.flink.runtime.io.network.api.reader.ReaderBase; -import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; -import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A record-oriented reader. - * <p> - * This abstract base class is used by both the mutable and immutable record - * readers. - * - * @param <T> - * The type of the record that can be read with this record reader. - */ -public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends - AbstractReader implements ReaderBase, StreamingReader { - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class); - - private final RecordDeserializer<T>[] recordDeserializers; - - private RecordDeserializer<T> currentRecordDeserializer; - - private boolean isFinished; - - private final BarrierBuffer barrierBuffer; - - - @SuppressWarnings("unchecked") - protected StreamingAbstractRecordReader(InputGate inputGate) { - super(inputGate); - barrierBuffer = new BarrierBuffer(inputGate, this); - - // Initialize one deserializer per input channel - this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate - .getNumberOfInputChannels()]; - for (int i = 0; i < recordDeserializers.length; i++) { - recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); - } - } - - protected boolean getNextRecord(T target) throws IOException, InterruptedException { - if (isFinished) { - return false; - } - - while (true) { - if (currentRecordDeserializer != null) { - DeserializationResult result = currentRecordDeserializer.getNextRecord(target); - - if (result.isBufferConsumed()) { - Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer(); - currentBuffer.recycle(); - currentRecordDeserializer = null; - } - - if (result.isFullRecord()) { - return true; - } - } - - final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); - - if (bufferOrEvent.isBuffer()) { - currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; - currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); - } else { - // Event received - final AbstractEvent event = bufferOrEvent.getEvent(); - - if (event instanceof StreamingSuperstep) { - barrierBuffer.processSuperstep(bufferOrEvent); - } else { - if (handleEvent(event)) { - if (inputGate.isFinished()) { - if (!barrierBuffer.isEmpty()) { - throw new RuntimeException( - "BarrierBuffer should be empty at this point"); - } - isFinished = true; - return false; - } else if (hasReachedEndOfSuperstep()) { - return false; - } // else: More data is coming... - } - } - } - } - } - - public void clearBuffers() { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - Buffer buffer = deserializer.getCurrentBuffer(); - if (buffer != null && !buffer.isRecycled()) { - buffer.recycle(); - } - } - } - - public void cleanup() throws IOException { - barrierBuffer.cleanup(); - } - - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - deserializer.setReporter(reporter); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java deleted file mode 100644 index 1356af5..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.reader.MutableReader; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; - -public class StreamingMutableRecordReader<T extends IOReadableWritable> extends - StreamingAbstractRecordReader<T> implements MutableReader<T> { - - public StreamingMutableRecordReader(InputGate inputGate) { - super(inputGate); - } - - @Override - public boolean next(final T target) throws IOException, InterruptedException { - return getNextRecord(target); - } - - @Override - public void clearBuffers() { - super.clearBuffers(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java index 75867cd..6c40c03 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java @@ -44,10 +44,14 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> { } @Override - public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, - int numberOfOutputChannels) { - - K key = record.getInstance().getKey(keySelector); + public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { + + K key = null; + try { + key = keySelector.getKey(record.getInstance().getValue()); + } catch (Exception e) { + throw new RuntimeException("Could not extract key from " + record.getInstance(), e); + } returnArray[0] = partitioner.partition(key, numberOfOutputChannels); http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java index 08c431b..7026d45 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java @@ -42,8 +42,13 @@ public class FieldsPartitioner<T> extends StreamPartitioner<T> { @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { - returnArray[0] = Math.abs(record.getInstance().getKey(keySelector).hashCode() - % numberOfOutputChannels); + Object key; + try { + key = keySelector.getKey(record.getInstance().getValue()); + } catch (Exception e) { + throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); + } + returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels); return returnArray; } http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java new file mode 100644 index 0000000..715f0d2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.api.watermark.Watermark; + +import java.io.IOException; + +/** + * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal + * {@link TypeSerializer}, instead, this is only used at the + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting + * {@link StreamRecord StreamRecords} and {@link Watermark Watermarks}. This serializer + * can handle both of them, therefore it returns {@link Object} the result has + * to be cast to the correct type. + * + * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} + */ +public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSerializer<T> { + + private final long IS_WATERMARK = Long.MIN_VALUE; + + private static final long serialVersionUID = 1L; + + public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) { + super(serializer); + if (serializer instanceof MultiplexingStreamRecordSerializer) { + throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); + } + } + + @Override + @SuppressWarnings("unchecked") + public Object copy(Object from) { + // we can reuse the timestamp since Instant is immutable + if (from instanceof StreamRecord) { + StreamRecord<T> fromRecord = (StreamRecord<T>) from; + return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp()); + } else if (from instanceof Watermark) { + // is immutable + return from; + } else { + throw new RuntimeException("Cannot copy " + from); + } + } + + @Override + @SuppressWarnings("unchecked") + public Object copy(Object from, Object reuse) { + if (from instanceof StreamRecord && reuse instanceof StreamRecord) { + StreamRecord<T> fromRecord = (StreamRecord<T>) from; + StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse; + + reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), fromRecord.getTimestamp()); + return reuse; + } else if (from instanceof Watermark) { + // is immutable + return from; + } else { + throw new RuntimeException("Cannot copy " + from); + } + } + + @Override + @SuppressWarnings("unchecked") + public void serialize(Object value, DataOutputView target) throws IOException { + if (value instanceof StreamRecord) { + StreamRecord<T> record = (StreamRecord<T>) value; + target.writeLong(record.getTimestamp()); + typeSerializer.serialize(record.getValue(), target); + } else if (value instanceof Watermark) { + target.writeLong(IS_WATERMARK); + target.writeLong(((Watermark) value).getTimestamp()); + } + } + + @Override + public Object deserialize(DataInputView source) throws IOException { + long millis = source.readLong(); + + if (millis == IS_WATERMARK) { + return new Watermark(source.readLong()); + } else { + T element = typeSerializer.deserialize(source); + return new StreamRecord<T>(element, millis); + } + } + + @Override + @SuppressWarnings("unchecked") + public Object deserialize(Object reuse, DataInputView source) throws IOException { + long millis = source.readLong(); + + if (millis == IS_WATERMARK) { + return new Watermark(source.readLong()); + + } else { + StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse; + T element = typeSerializer.deserialize(reuseRecord.getValue(), source); + reuseRecord.replace(element, millis); + return reuse; + } + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + long millis = source.readLong(); + target.writeLong(millis); + + if (millis == IS_WATERMARK) { + target.writeLong(source.readLong()); + } else { + typeSerializer.copy(source, target); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java index 66a64b3..aff030e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java @@ -17,87 +17,106 @@ package org.apache.flink.streaming.runtime.streamrecord; -import java.io.Serializable; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; - /** - * Object for wrapping a tuple or other object with ID used for sending records - * between streaming task in Apache Flink stream processing. + * One value in a data stream. This stores the value and the associated timestamp. */ -public class StreamRecord<T> implements Serializable { - private static final long serialVersionUID = 1L; +public class StreamRecord<T> { - private T streamObject; - public boolean isTuple; + // We store it as Object so that we can reuse a StreamElement for emitting + // elements of a different type while still reusing the timestamp. + private Object value; + private long timestamp; /** - * Creates an empty StreamRecord + * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the + * result of {@code new Instant(0)}. */ - public StreamRecord() { + public StreamRecord(T value) { + this(value, Long.MIN_VALUE + 1); + // be careful to set it to MIN_VALUE + 1, because MIN_VALUE is reserved as the + // special tag to signify that a transmitted element is a Watermark in StreamRecordSerializer } /** - * Gets the wrapped object from the StreamRecord - * - * @return The object wrapped + * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the + * given timestamp. + * + * @param value The value to wrap in this {@link StreamRecord} + * @param timestamp The timestamp in milliseconds */ - public T getObject() { - return streamObject; + public StreamRecord(T value, long timestamp) { + this.value = value; + this.timestamp = timestamp; } /** - * Gets the field of the contained object at the given position. If a tuple - * is wrapped then the getField method is invoked. If the StreamRecord - * contains and object of Basic types only position 0 could be returned. - * - * @param pos - * Position of the field to get. - * @return Returns the object contained in the position. + * Returns the value wrapped in this stream value. */ - public Object getField(int pos) { - if (isTuple) { - return ((Tuple) streamObject).getField(pos); - } else { - if (pos == 0) { - return streamObject; - } else { - throw new IndexOutOfBoundsException(); - } - } + @SuppressWarnings("unchecked") + public T getValue() { + return (T) value; } /** - * Extracts key for the stored object using the keySelector provided. - * - * @param keySelector - * KeySelector for extracting the key - * @return The extracted key + * Returns the timestamp associated with this stream value in milliseconds. */ - public <R> R getKey(KeySelector<T, R> keySelector) { - try { - return keySelector.getKey(streamObject); - } catch (Exception e) { - throw new RuntimeException("Failed to extract key: " + e.getMessage()); - } + public long getTimestamp() { + return timestamp; + } + + /** + * Replace the currently stored value by the given new value. This returns a StreamElement + * with the generic type parameter that matches the new value while keeping the old + * timestamp. + * + * @param element Element to set in this stream value + * @return Returns the StreamElement with replaced value + */ + @SuppressWarnings("unchecked") + public <X> StreamRecord<X> replace(X element) { + this.value = element; + return (StreamRecord<X>) this; } /** - * Sets the object stored - * - * @param object - * Object to set - * @return Returns the StreamRecord object + * Replace the currently stored value by the given new value and the currently stored + * timestamp with the new timestamp. This returns a StreamElement with the generic type + * parameter that matches the new value. + * + * @param value The new value to wrap in this {@link StreamRecord} + * @param timestamp The new timestamp in milliseconds + * @return Returns the StreamElement with replaced value */ - public StreamRecord<T> setObject(T object) { - this.streamObject = object; - return this; + @SuppressWarnings("unchecked") + public <X> StreamRecord<X> replace(X value, long timestamp) { + this.timestamp = timestamp; + this.value = value; + return (StreamRecord<X>) this; } @Override - public String toString() { - return streamObject.toString(); + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StreamRecord that = (StreamRecord) o; + + return value.equals(that.value) && timestamp == that.timestamp; } + @Override + public int hashCode() { + int result = value != null ? value.hashCode() : 0; + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public String toString() { + return "Record{" + value + "; " + timestamp + '}'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java index 4499499..b05eb36 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java @@ -20,26 +20,35 @@ package org.apache.flink.streaming.runtime.streamrecord; import java.io.IOException; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> { +/** + * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with + * the element. + * + * <p> + * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also + * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same + * stream with {@link StreamRecord StreamRecords}. + * + * @see MultiplexingStreamRecordSerializer + * + * @param <T> The type of value in the {@link StreamRecord} + */ +public class StreamRecordSerializer<T> extends TypeSerializer<Object> { private static final long serialVersionUID = 1L; - private final TypeSerializer<T> typeSerializer; - private final boolean isTuple; + protected final TypeSerializer<T> typeSerializer; - public StreamRecordSerializer(TypeInformation<T> typeInfo, ExecutionConfig executionConfig) { - this.typeSerializer = typeInfo.createSerializer(executionConfig); - this.isTuple = typeInfo.isTupleType(); - } - - public TypeSerializer<T> getObjectSerializer() { - return typeSerializer; + public StreamRecordSerializer(TypeSerializer<T> serializer) { + if (serializer instanceof StreamRecordSerializer) { + throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); + } + this.typeSerializer = Preconditions.checkNotNull(serializer); } @Override @@ -48,34 +57,34 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord } @Override - public StreamRecordSerializer<T> duplicate() { + @SuppressWarnings("unchecked") + public TypeSerializer duplicate() { return this; } @Override - public StreamRecord<T> createInstance() { + public Object createInstance() { try { - StreamRecord<T> t = new StreamRecord<T>(); - t.isTuple = isTuple; - t.setObject(typeSerializer.createInstance()); - return t; + return new StreamRecord<T>(typeSerializer.createInstance()); } catch (Exception e) { throw new RuntimeException("Cannot instantiate StreamRecord.", e); } } @Override - public StreamRecord<T> copy(StreamRecord<T> from) { - StreamRecord<T> rec = new StreamRecord<T>(); - rec.isTuple = from.isTuple; - rec.setObject(typeSerializer.copy(from.getObject())); - return rec; + @SuppressWarnings("unchecked") + public Object copy(Object from) { + StreamRecord<T> fromRecord = (StreamRecord<T>) from; + return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp()); } @Override - public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) { - reuse.isTuple = from.isTuple; - reuse.setObject(typeSerializer.copy(from.getObject(), reuse.getObject())); + @SuppressWarnings("unchecked") + public Object copy(Object from, Object reuse) { + StreamRecord<T> fromRecord = (StreamRecord<T>) from; + StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse; + + reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), 0); return reuse; } @@ -85,26 +94,29 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord } @Override - public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException { - typeSerializer.serialize(value.getObject(), target); + @SuppressWarnings("unchecked") + public void serialize(Object value, DataOutputView target) throws IOException { + StreamRecord<T> record = (StreamRecord<T>) value; + typeSerializer.serialize(record.getValue(), target); } @Override - public StreamRecord<T> deserialize(DataInputView source) throws IOException { - StreamRecord<T> record = new StreamRecord<T>(); - record.isTuple = this.isTuple; - record.setObject(typeSerializer.deserialize(source)); - return record; + public Object deserialize(DataInputView source) throws IOException { + T element = typeSerializer.deserialize(source); + return new StreamRecord<T>(element, 0); } @Override - public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException { - reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source)); + @SuppressWarnings("unchecked") + public Object deserialize(Object reuse, DataInputView source) throws IOException { + StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse; + T element = typeSerializer.deserialize(reuseRecord.getValue(), source); + reuseRecord.replace(element, 0); return reuse; } @Override public void copy(DataInputView source, DataOutputView target) throws IOException { - // Needs to be implemented + typeSerializer.copy(source, target); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java new file mode 100644 index 0000000..d94b5b4 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import java.io.IOException; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.task.TaskEvent; + +/** + * Checkpoint barriers are used to synchronize checkpoints throughout the streaming topology. The + * barriers are emitted by the sources when instructed to do so by the JobManager. When + * operators receive a {@link CheckpointBarrier} on one of its inputs it must block processing + * of further elements on this input until all inputs received the checkpoint barrier + * corresponding to to that checkpoint. Once all inputs received the checkpoint barrier for + * a checkpoint the operator is to perform the checkpoint and then broadcast the barrier to + * downstream operators. + * + * <p> + * The checkpoint barrier IDs are advancing. Once an operator receives a {@link CheckpointBarrier} + * for a checkpoint with a higher id it is to discard all barriers that it received from previous + * checkpoints and unblock all other inputs. + */ +public class CheckpointBarrier extends TaskEvent { + + protected long id; + protected long timestamp; + + public CheckpointBarrier() {} + + public CheckpointBarrier(long id, long timestamp) { + this.id = id; + this.timestamp = timestamp; + } + + public long getId() { + return id; + } + + public long getTimestamp() { + return id; + } + + // ------------------------------------------------------------------------ + + @Override + public void write(DataOutputView out) throws IOException { + out.writeLong(id); + out.writeLong(timestamp); + } + + @Override + public void read(DataInputView in) throws IOException { + id = in.readLong(); + timestamp = in.readLong(); + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32)); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof CheckpointBarrier)) { + return false; + } + else { + CheckpointBarrier that = (CheckpointBarrier) other; + return that.id == this.id && that.timestamp == this.timestamp; + } + } + + @Override + public String toString() { + return String.format("CheckpointBarrier %d @ %d", id, timestamp); + } +}