Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15576614
  
    --- Diff: 
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
 ---
    @@ -0,0 +1,700 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.api;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.flink.api.java.functions.FilterFunction;
    +import org.apache.flink.api.java.functions.FlatMapFunction;
    +import org.apache.flink.api.java.functions.GroupReduceFunction;
    +import org.apache.flink.api.java.functions.MapFunction;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import 
org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
    +import org.apache.flink.streaming.api.collector.OutputSelector;
    +import org.apache.flink.streaming.api.function.co.CoMapFunction;
    +import org.apache.flink.streaming.api.function.sink.SinkFunction;
    +import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
    +import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
    +import 
org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
    +import 
org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
    +
    +/**
    + * A DataStream represents a stream of elements of the same type. A 
DataStream
    + * can be transformed into another DataStream by applying a transformation 
as
    + * for example
    + * <ul>
    + * <li>{@link DataStream#map},</li>
    + * <li>{@link DataStream#filter}, or</li>
    + * <li>{@link DataStream#batchReduce}.</li>
    + * </ul>
    + * 
    + * @param <T>
    + *            The type of the DataStream, i.e., the type of the elements 
of the
    + *            DataStream.
    + */
    +public class DataStream<T extends Tuple> {
    +
    +   protected static Integer counter = 0;
    +   protected final StreamExecutionEnvironment environment;
    +   protected String id;
    +   protected int degreeOfParallelism;
    +   protected String userDefinedName;
    +   protected OutputSelector<T> outputSelector;
    +   protected List<String> connectIDs;
    +   protected List<ConnectionType> ctypes;
    +   protected List<Integer> cparams;
    +   protected boolean iterationflag;
    +   protected Integer iterationID;
    +
    +   /**
    +    * Create a new {@link DataStream} in the given execution environment 
with
    +    * partitioning set to shuffle by default.
    +    * 
    +    * @param environment
    +    *            StreamExecutionEnvironment
    +    * @param operatorType
    +    *            The type of the operator in the component
    +    */
    +   protected DataStream(StreamExecutionEnvironment environment, String 
operatorType) {
    +           if (environment == null) {
    +                   throw new NullPointerException("context is null");
    +           }
    +
    +           // TODO add name based on component number an preferable 
sequential id
    +           counter++;
    +           this.id = operatorType + "-" + counter.toString();
    +           this.environment = environment;
    +           this.degreeOfParallelism = environment.getDegreeOfParallelism();
    +           initConnections();
    +
    +   }
    +
    +   /**
    +    * Create a new DataStream by creating a copy of another DataStream
    +    * 
    +    * @param dataStream
    +    *            The DataStream that will be copied.
    +    */
    +   protected DataStream(DataStream<T> dataStream) {
    +           this.environment = dataStream.environment;
    +           this.id = dataStream.id;
    +           this.degreeOfParallelism = dataStream.degreeOfParallelism;
    +           this.userDefinedName = dataStream.userDefinedName;
    +           this.outputSelector = dataStream.outputSelector;
    +           this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
    +           this.ctypes = new 
ArrayList<StreamExecutionEnvironment.ConnectionType>(dataStream.ctypes);
    +           this.cparams = new ArrayList<Integer>(dataStream.cparams);
    +           this.iterationflag = dataStream.iterationflag;
    +           this.iterationID = dataStream.iterationID;
    +   }
    +
    +   /**
    +    * Initialize the connection and partitioning among the connected
    +    * {@link DataStream}s.
    +    */
    +   private void initConnections() {
    +           connectIDs = new ArrayList<String>();
    +           connectIDs.add(getId());
    +           ctypes = new 
ArrayList<StreamExecutionEnvironment.ConnectionType>();
    +           ctypes.add(ConnectionType.SHUFFLE);
    +           cparams = new ArrayList<Integer>();
    +           cparams.add(0);
    +
    +   }
    +
    +   /**
    +    * Returns the ID of the {@link DataStream}.
    +    * 
    +    * @return ID of the DataStream
    +    */
    +   public String getId() {
    +           return id;
    +   }
    +
    +   /**
    +    * Sets the mutability of the operator represented by the DataStream. 
If the
    +    * operator is set to mutable, the tuples received in the user defined
    +    * functions, will be reused after the function call. Setting an 
operator to
    +    * mutable greatly reduces garbage collection overhead and thus 
scalability.
    +    * 
    +    * @param isMutable
    +    *            The mutability of the operator.
    +    * @return The DataStream with mutability set.
    +    */
    +   public DataStream<T> setMutability(boolean isMutable) {
    +           environment.setMutability(this, isMutable);
    +           return this;
    +   }
    +
    +   /**
    +    * Sets the degree of parallelism for this operator. The degree must be 
1 or
    +    * more.
    +    * 
    +    * @param dop
    +    *            The degree of parallelism for this operator.
    +    * @return The operator with set degree of parallelism.
    +    */
    +   public DataStream<T> setParallelism(int dop) {
    +           if (dop < 1) {
    +                   throw new IllegalArgumentException("The parallelism of 
an operator must be at least 1.");
    +           }
    +           this.degreeOfParallelism = dop;
    +
    +           environment.setOperatorParallelism(this);
    +
    +           return new DataStream<T>(this);
    +
    +   }
    +
    +   /**
    +    * Gets the degree of parallelism for this operator.
    +    * 
    +    * @return The parallelism set for this operator.
    +    */
    +   public int getParallelism() {
    +           return this.degreeOfParallelism;
    +   }
    +
    +   /**
    +    * Gives the data transformation(vertex) a user defined name in order 
to use
    +    * at directed outputs. The {@link OutputSelector} of the input vertex
    +    * should use this name for directed emits.
    +    * 
    +    * @param name
    +    *            The name to set
    +    * @return The named DataStream.
    +    */
    +   public DataStream<T> name(String name) {
    +           // copy?
    +           if (name == "") {
    --- End diff --
    
    Sure, pesky mistake. Thanks. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to