Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1215#discussion_r41125422
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
    @@ -24,49 +24,169 @@
     import org.apache.flink.api.java.functions.KeySelector;
     import org.apache.flink.api.java.typeutils.TypeExtractor;
     import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
    -import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
     import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
     import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
    +import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
     import org.apache.flink.streaming.api.operators.StreamGroupedFold;
     import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
    +import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
    +import 
org.apache.flink.streaming.api.transformations.PartitionTransformation;
    +import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    +import 
org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
    +import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import 
org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
    +import org.apache.flink.streaming.api.windowing.time.AbstractTime;
    +import org.apache.flink.streaming.api.windowing.time.EventTime;
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    +import org.apache.flink.streaming.api.windowing.windows.Window;
    +import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
    +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
     
     /**
    - * A GroupedDataStream represents a {@link DataStream} which has been
    - * partitioned by the given {@link KeySelector}. Operators like {@link 
#reduce},
    - * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
    - * get additional functionality by the grouping.
    + * A {@code KeyedStream} represents a {@link DataStream} on which operator 
state is
    + * partitioned by key using a provided {@link KeySelector}. Typical 
operations supported by a
    + * {@code DataStream} are also possible on a {@code KeyedStream}, with the 
exception of
    + * partitioning methods such as shuffle, forward and keyBy.
      *
    - * @param <T> The type of the elements in the Grouped Stream.
    + * <p>
    + * Reduce-style operations, such as {@link #reduce}, {@link #sum} and 
{@link #fold} work on elements
    + * that have the same key.
    + *
    + * @param <T> The type of the elements in the Keyed Stream.
      * @param <KEY> The type of the key in the Keyed Stream.
      */
    -public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
    +public class KeyedStream<T, KEY> extends DataStream<T> {
    +   
    +   protected final KeySelector<T, KEY> keySelector;
    +
    +   /**
    +    * Creates a new {@link KeyedStream} using the given {@link KeySelector}
    +    * to partition operator state by key.
    +    * 
    +    * @param dataStream
    +    *            Base stream of data
    +    * @param keySelector
    +    *            Function for determining state partitions
    +    */
    +   public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> 
keySelector) {
    +           super(dataStream.getExecutionEnvironment(), new 
PartitionTransformation<>(dataStream.getTransformation(), new 
HashPartitioner<>(keySelector)));
    +           this.keySelector = keySelector;
    +   }
    +
    +   
    +   public KeySelector<T, KEY> getKeySelector() {
    +           return this.keySelector;
    +   }
    +
    +   
    +   @Override
    +   protected DataStream<T> setConnectionType(StreamPartitioner<T> 
partitioner) {
    +           throw new UnsupportedOperationException("Cannot override 
partitioning for KeyedStream.");
    +   }
    +
    +   
    +   @Override
    +   public <R> SingleOutputStreamOperator<R, ?> transform(String 
operatorName,
    +                   TypeInformation<R> outTypeInfo, 
OneInputStreamOperator<T, R> operator) {
    +
    +           SingleOutputStreamOperator<R, ?> returnStream = 
super.transform(operatorName, outTypeInfo,operator);
    +
    +           ((OneInputTransformation<T, R>) 
returnStream.getTransformation()).setStateKeySelector(keySelector);
    +           return returnStream;
    +   }
    +
    +   
    +   
    +   @Override
    +   public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
    +           DataStreamSink<T> result = super.addSink(sinkFunction);
    +           result.getTransformation().setStateKeySelector(keySelector);
    +           return result;
    +   }
    +   
    +   // 
------------------------------------------------------------------------
    +   //  Windowing
    +   // 
------------------------------------------------------------------------
     
        /**
    -    * Creates a new {@link GroupedDataStream}, group inclusion is 
determined using
    -    * a {@link KeySelector} on the elements of the {@link DataStream}.
    +    * Windows this {@code KeyedStream} into tumbling time windows.
         *
    -    * @param dataStream Base stream of data
    -    * @param keySelector Function for determining group inclusion
    +    * <p>
    +    * This is a shortcut for either {@code 
.window(TumblingTimeWindows.of(size))} or
    +    * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on 
the time characteristic
    +    * set using
    +    * {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
    +    *
    +    * @param size The size of the window.
    +    */
    +   public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) 
{
    +           AbstractTime actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
    +
    +           if (actualSize instanceof EventTime) {
    +                   return window(TumblingTimeWindows.of(actualSize));
    +           } else {
    +                   return 
window(TumblingProcessingTimeWindows.of(actualSize));
    +           }
    +   }
    +
    +   /**
    +    * Windows this {@code KeyedStream} into sliding time windows.
    +    *
    +    * <p>
    +    * This is a shortcut for either {@code 
.window(SlidingTimeWindows.of(size, slide))} or
    +    * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} 
depending on the time characteristic
    +    * set using
    +    * {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
    +    *
    +    * @param size The size of the window.
         */
    -   public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> 
keySelector) {
    -           super(dataStream, keySelector);
    +   public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, 
AbstractTime slide) {
    +           AbstractTime actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
    +           AbstractTime actualSlide = 
slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
    +
    +           if (actualSize instanceof EventTime) {
    +                   return window(SlidingTimeWindows.of(actualSize, 
actualSlide));
    +           } else {
    +                   return 
window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
    +           }
    +   }
    +
    +   /**
    +    * Windows this data stream to a {@code WindowedStream}, which 
evaluates windows
    +    * over a key grouped stream. Elements are put into windows by a {@link 
WindowAssigner}. The
    +    * grouping of elements is done both by key and by window.
    +    *
    +    * <p>
    +    * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} 
can be defined to specify
    +    * when windows are evaluated. However, {@code WindowAssigners} have a 
default {@code Trigger}
    +    * that is used if a {@code Trigger} is not specified.
    +    *
    +    * @param assigner The {@code WindowAssigner} that assigns elements to 
windows.
    +    * @return The trigger windows data stream.
    +    */
    +   public <W extends Window> WindowedStream<T, KEY, W> 
window(WindowAssigner<? super T, W> assigner) {
    +           return new WindowedStream<>(this, assigner);
        }
     
    +   // 
------------------------------------------------------------------------
    +   //  Non-Windowed aggregation operations
    +   // 
------------------------------------------------------------------------
     
        /**
         * Applies a reduce transformation on the grouped data stream grouped 
on by
         * the given key position. The {@link ReduceFunction} will receive input
         * values based on the key value. Only input values with the same key 
will
         * go to the same reducer.
    -    * 
    +    *
         * @param reducer
         *            The {@link ReduceFunction} that will be called for every
         *            element of the input values with the same key.
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> 
reducer) {
    -           return transform("Grouped Reduce", getType(), new 
StreamGroupedReduce<T>(
    -                           clean(reducer), keySelector));
    +           return transform("Grouped Reduce", getType(), new 
StreamGroupedReduce<>(clean(reducer), keySelector));
    --- End diff --
    
    "Grouped Reduce" or simply "Reduce"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to