Hi, Milinda,

That was an accidental mistake. I have reverted the check-in. I am still
working on that. Thanks!

-Yi

On Mon, Jun 1, 2015 at 9:34 PM, Milinda Pathirage <mpath...@umail.iu.edu>
wrote:

> Hi Navina,
>
> Did we decided to push this patch to samza-sql branch. I thought Yi is
> still working on this. Some Git conflict related texts are still there in
> this commit.
>
> +<<<<<<< HEAD
> +   * The callback object
> +=======
> +   * The callback function
> +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
> callbacks w/o inheriting and creating many sub-classes from operators
>
> Milinda
>
> On Mon, Jun 1, 2015 at 9:06 PM, <nav...@apache.org> wrote:
>
> > Yi's TopologyBuilder RB 34500
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/samza/repo
> > Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477
> > Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477
> > Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477
> >
> > Branch: refs/heads/samza-sql
> > Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576
> > Parents: 41c4cd0
> > Author: Navina <navi.trin...@gmail.com>
> > Authored: Thu May 28 18:51:30 2015 -0700
> > Committer: Navina <navi.trin...@gmail.com>
> > Committed: Thu May 28 18:51:30 2015 -0700
> >
> > ----------------------------------------------------------------------
> >  .../apache/samza/sql/api/data/EntityName.java   |  41 ++-
> >  .../org/apache/samza/sql/api/data/Table.java    |   7 +-
> >  .../samza/sql/api/operators/Operator.java       |   4 +
> >  .../sql/api/operators/OperatorCallback.java     |   1 -
> >  .../samza/sql/api/operators/OperatorRouter.java |   8 +
> >  .../samza/sql/api/operators/OperatorSink.java   |  30 ++
> >  .../samza/sql/api/operators/OperatorSource.java |  30 ++
> >  .../samza/sql/api/operators/SimpleOperator.java |   3 +-
> >  .../samza/sql/data/IncomingMessageTuple.java    |   1 -
> >  .../sql/operators/NoopOperatorCallback.java     |  53 ++++
> >  .../samza/sql/operators/OperatorTopology.java   |  53 ++++
> >  .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++
> >  .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++
> >  .../samza/sql/operators/SimpleRouter.java       | 141 +++++++++
> >  .../operators/factory/NoopOperatorCallback.java |  50 ----
> >  .../operators/factory/SimpleOperatorImpl.java   | 136 ---------
> >  .../operators/factory/SimpleOperatorSpec.java   | 106 -------
> >  .../sql/operators/factory/SimpleRouter.java     | 136 ---------
> >  .../sql/operators/factory/TopologyBuilder.java  | 284
> +++++++++++++++++++
> >  .../sql/operators/join/StreamStreamJoin.java    |   3 +-
> >  .../operators/join/StreamStreamJoinSpec.java    |  15 +-
> >  .../sql/operators/partition/PartitionOp.java    |   3 +-
> >  .../sql/operators/partition/PartitionSpec.java  |   2 +-
> >  .../sql/operators/window/BoundedTimeWindow.java |   4 +-
> >  .../samza/sql/operators/window/WindowSpec.java  |   7 +-
> >  .../samza/task/sql/SimpleMessageCollector.java  |  37 ++-
> >  .../task/sql/RandomWindowOperatorTask.java      |  11 +-
> >  .../apache/samza/task/sql/StreamSqlTask.java    |  26 +-
> >  .../samza/task/sql/UserCallbacksSqlTask.java    |  66 ++---
> >  29 files changed, 991 insertions(+), 520 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> > index 80ba455..df1b11b 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java
> > @@ -49,6 +49,8 @@ public class EntityName {
> >     */
> >    private final String name;
> >
> > +  private final boolean isSystemEntity;
> > +
> >    /**
> >     * Static map of already allocated table names
> >     */
> > @@ -59,15 +61,19 @@ public class EntityName {
> >     */
> >    private static Map<String, EntityName> streams = new HashMap<String,
> > EntityName>();
> >
> > +  private static final String ANONYMOUS = "anonymous";
> > +
> >    /**
> >     * Private ctor to create entity names
> >     *
> >     * @param type Type of the entity name
> >     * @param name Formatted name of the entity
> > +   * @param isSystemEntity whether the entity is a system input/output
> >     */
> > -  private EntityName(EntityType type, String name) {
> > +  private EntityName(EntityType type, String name, boolean
> > isSystemEntity) {
> >      this.type = type;
> >      this.name = name;
> > +    this.isSystemEntity = isSystemEntity;
> >    }
> >
> >    @Override
> > @@ -102,6 +108,10 @@ public class EntityName {
> >      return this.type.equals(EntityType.STREAM);
> >    }
> >
> > +  public boolean isSystemEntity() {
> > +    return this.isSystemEntity;
> > +  }
> > +
> >    /**
> >     * Get the formatted entity name
> >     *
> > @@ -111,15 +121,24 @@ public class EntityName {
> >      return this.name;
> >    }
> >
> > +  public static EntityName getTableName(String name) {
> > +    return getTableName(name, false);
> > +  }
> > +
> > +  public static EntityName getStreamName(String name) {
> > +    return getStreamName(name, false);
> > +  }
> > +
> >    /**
> >     * Static method to get the instance of {@code EntityName} with type
> > {@code EntityType.TABLE}
> >     *
> >     * @param name The formatted entity name of the relation
> > +   * @param isSystem The boolean flag indicating whether this is a
> system
> > input/output
> >     * @return A <code>EntityName</code> for a relation
> >     */
> > -  public static EntityName getTableName(String name) {
> > +  public static EntityName getTableName(String name, boolean isSystem) {
> >      if (tables.get(name) == null) {
> > -      tables.put(name, new EntityName(EntityType.TABLE, name));
> > +      tables.put(name, new EntityName(EntityType.TABLE, name,
> isSystem));
> >      }
> >      return tables.get(name);
> >    }
> > @@ -128,13 +147,25 @@ public class EntityName {
> >     * Static method to get the instance of <code>EntityName</code> with
> > type <code>EntityType.STREAM</code>
> >     *
> >     * @param name The formatted entity name of the stream
> > +   * @param isSystem The boolean flag indicating whether this is a
> system
> > input/output
> >     * @return A <code>EntityName</code> for a stream
> >     */
> > -  public static EntityName getStreamName(String name) {
> > +  public static EntityName getStreamName(String name, boolean isSystem)
> {
> >      if (streams.get(name) == null) {
> > -      streams.put(name, new EntityName(EntityType.STREAM, name));
> > +      streams.put(name, new EntityName(EntityType.STREAM, name,
> > isSystem));
> >      }
> >      return streams.get(name);
> >    }
> >
> > +  public static EntityName getAnonymousStream() {
> > +    return getStreamName(ANONYMOUS);
> > +  }
> > +
> > +  public static EntityName getAnonymousTable() {
> > +    return getTableName(ANONYMOUS);
> > +  }
> > +
> > +  public boolean isAnonymous() {
> > +    return this.name.equals(ANONYMOUS);
> > +  }
> >  }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> > ----------------------------------------------------------------------
> > diff --git
> > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> > index 7b4d984..b4dce07 100644
> > ---
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> > +++
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java
> > @@ -19,6 +19,9 @@
> >
> >  package org.apache.samza.sql.api.data;
> >
> > +import java.util.List;
> > +
> > +
> >  /**
> >   * This interface defines a non-ordered {@link
> > org.apache.samza.sql.api.data.Relation}, which has a unique primary key
> >   *
> > @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> {
> >    /**
> >     * Get the primary key field name for this table
> >     *
> > -   * @return The name of the primary key field
> > +   * @return The names of the primary key fields
> >     */
> > -  String getPrimaryKeyName();
> > +  List<String> getPrimaryKeyNames();
> >
> >  }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> > index d6f6b57..9c6eaa5 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java
> > @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext;
> >  import org.apache.samza.task.TaskCoordinator;
> >
> >
> > +/**
> > + * This class defines the common interface for operator classes.
> > + */
> >  public interface Operator {
> > +
> >    /**
> >     * Method to initialize the operator
> >     *
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> > index fb2aa89..5a77d95 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
> > @@ -23,7 +23,6 @@ import org.apache.samza.sql.api.data.Tuple;
> >  import org.apache.samza.task.MessageCollector;
> >  import org.apache.samza.task.TaskCoordinator;
> >
> > -
> >  /**
> >   * Defines the callback functions to allow customized functions to be
> > invoked before process and before sending the result
> >   */
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> > index 0759638..432e6b3 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
> > @@ -19,6 +19,7 @@
> >
> >  package org.apache.samza.sql.api.operators;
> >
> > +import java.util.Iterator;
> >  import java.util.List;
> >
> >  import org.apache.samza.sql.api.data.EntityName;
> > @@ -51,4 +52,11 @@ public interface OperatorRouter extends Operator {
> >     */
> >    List<SimpleOperator> getNextOperators(EntityName output);
> >
> > +  /**
> > +   * This method provides an iterator to go through all operators
> > connected via {@code OperatorRouter}
> > +   *
> > +   * @return An {@link java.util.Iterator} for all operators connected
> > via {@code OperatorRouter}
> > +   */
> > +  Iterator<SimpleOperator> iterator();
> > +
> >  }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> > new file mode 100644
> > index 0000000..e2c748c
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
> > @@ -0,0 +1,30 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +package org.apache.samza.sql.api.operators;
> > +
> > +import java.util.Iterator;
> > +
> > +import org.apache.samza.sql.api.data.EntityName;
> > +
> > +
> > +public interface OperatorSink {
> > +  Iterator<SimpleOperator> opIterator();
> > +
> > +  EntityName getName();
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> > new file mode 100644
> > index 0000000..860c1aa
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
> > @@ -0,0 +1,30 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +package org.apache.samza.sql.api.operators;
> > +
> > +import java.util.Iterator;
> > +
> > +import org.apache.samza.sql.api.data.EntityName;
> > +
> > +
> > +public interface OperatorSource {
> > +  Iterator<SimpleOperator> opIterator();
> > +
> > +  EntityName getName();
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> > index c49a822..60ace9c 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
> > @@ -19,8 +19,6 @@
> >
> >  package org.apache.samza.sql.api.operators;
> >
> > -
> > -
> >  /**
> >   * The interface for a {@code SimpleOperator} that implements a simple
> > primitive relational logic operation
> >   */
> > @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator {
> >     * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec}
> > object that defines the configuration/parameters of the operator
> >     */
> >    OperatorSpec getSpec();
> > +
> >  }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> > index 72a59f2..af040f0 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
> > @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple {
> >
> >    @Override
> >    public long getCreateTimeNano() {
> > -    // TODO: this is wrong and just to keep as an placeholder. It should
> > be replaced by the message publish time when the publish timestamp is
> > available in the message metadata
> >      return this.recvTimeNano;
> >    }
> >
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> > new file mode 100644
> > index 0000000..e951737
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
> > @@ -0,0 +1,53 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +package org.apache.samza.sql.operators;
> > +
> > +import org.apache.samza.sql.api.data.Relation;
> > +import org.apache.samza.sql.api.data.Tuple;
> > +import org.apache.samza.sql.api.operators.OperatorCallback;
> > +import org.apache.samza.task.MessageCollector;
> > +import org.apache.samza.task.TaskCoordinator;
> > +
> > +
> > +/**
> > + * This is a default NOOP operator callback object that does nothing
> > before and after the process method
> > + */
> > +public final class NoopOperatorCallback implements OperatorCallback {
> > +
> > +  @Override
> > +  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
> > TaskCoordinator coordinator) {
> > +    return tuple;
> > +  }
> > +
> > +  @Override
> > +  public Relation beforeProcess(Relation rel, MessageCollector
> collector,
> > TaskCoordinator coordinator) {
> > +    return rel;
> > +  }
> > +
> > +  @Override
> > +  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
> > TaskCoordinator coordinator) {
> > +    return tuple;
> > +  }
> > +
> > +  @Override
> > +  public Relation afterProcess(Relation rel, MessageCollector collector,
> > TaskCoordinator coordinator) {
> > +    return rel;
> > +  }
> > +
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> > new file mode 100644
> > index 0000000..8b70092
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
> > @@ -0,0 +1,53 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +package org.apache.samza.sql.operators;
> > +
> > +import java.util.Iterator;
> > +
> > +import org.apache.samza.sql.api.data.EntityName;
> > +import org.apache.samza.sql.api.operators.OperatorSink;
> > +import org.apache.samza.sql.api.operators.OperatorSource;
> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> > +
> > +
> > +/**
> > + * This class implements a partially completed {@link
> > org.apache.samza.sql.operators.factory.TopologyBuilder} that signifies a
> > partially completed
> > + * topology that the current operator has unbounded input stream that
> can
> > be attached to other operators' output
> > + */
> > +public class OperatorTopology implements OperatorSource, OperatorSink {
> > +
> > +  private final EntityName name;
> > +  private final SimpleRouter router;
> > +
> > +  public OperatorTopology(EntityName name, SimpleRouter router) {
> > +    this.name = name;
> > +    this.router = router;
> > +  }
> > +
> > +  @Override
> > +  public Iterator<SimpleOperator> opIterator() {
> > +    return this.router.iterator();
> > +  }
> > +
> > +  @Override
> > +  public EntityName getName() {
> > +    return this.name;
> > +  }
> > +
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> > new file mode 100644
> > index 0000000..423880b
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
> > @@ -0,0 +1,147 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +
> > +package org.apache.samza.sql.operators;
> > +
> > +import org.apache.samza.sql.api.data.Relation;
> > +import org.apache.samza.sql.api.data.Tuple;
> > +import org.apache.samza.sql.api.operators.OperatorCallback;
> > +import org.apache.samza.sql.api.operators.OperatorSpec;
> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> > +import org.apache.samza.task.MessageCollector;
> > +import org.apache.samza.task.TaskCoordinator;
> > +import org.apache.samza.task.sql.SimpleMessageCollector;
> > +
> > +
> > +/**
> > + * An abstract class that encapsulate the basic information and methods
> > that all operator classes should implement.
> > + * It implements the interface {@link
> > org.apache.samza.sql.api.operators.SimpleOperator}
> > + */
> > +public abstract class SimpleOperatorImpl implements SimpleOperator {
> > +  /**
> > +   * The specification of this operator
> > +   */
> > +  private final OperatorSpec spec;
> > +
> > +  /**
> > +<<<<<<< HEAD
> > +   * The callback object
> > +=======
> > +   * The callback function
> > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of
> > callbacks w/o inheriting and creating many sub-classes from operators
> > +   */
> > +  private final OperatorCallback callback;
> > +
> > +  /**
> > +   * Ctor of {@code SimpleOperatorImpl} class
> > +   *
> > +   * @param spec The specification of this operator
> > +   */
> > +  public SimpleOperatorImpl(OperatorSpec spec) {
> > +    this(spec, new NoopOperatorCallback());
> > +  }
> > +
> > +  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback
> callback)
> > {
> > +    this.spec = spec;
> > +    this.callback = callback;
> > +  }
> > +
> > +  @Override
> > +  public OperatorSpec getSpec() {
> > +    return this.spec;
> > +  }
> > +
> > +  /**
> > +   * This method is made final s.t. the sequence of invocations between
> > {@link
> >
> org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation,
> > MessageCollector, TaskCoordinator)}
> > +   * and real processing of the input relation is fixed.
> > +   */
> > +  @Override
> > +  final public void process(Relation deltaRelation, MessageCollector
> > collector, TaskCoordinator coordinator)
> > +      throws Exception {
> > +    Relation rel = this.callback.beforeProcess(deltaRelation, collector,
> > coordinator);
> > +    if (rel == null) {
> > +      return;
> > +    }
> > +    this.realProcess(rel, getCollector(collector, coordinator),
> > coordinator);
> > +  }
> > +
> > +  /**
> > +   * This method is made final s.t. the sequence of invocations between
> > {@link
> > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
> > MessageCollector, TaskCoordinator)}
> > +   * and real processing of the input tuple is fixed.
> > +   */
> > +  @Override
> > +  final public void process(Tuple tuple, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > +    Tuple ituple = this.callback.beforeProcess(tuple, collector,
> > coordinator);
> > +    if (ituple == null) {
> > +      return;
> > +    }
> > +    this.realProcess(ituple, getCollector(collector, coordinator),
> > coordinator);
> > +  }
> > +
> > +  /**
> > +   * This method is made final s.t. we enforce the invocation of {@code
> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)}
> before
> > doing anything futher
> > +   */
> > +  @Override
> > +  final public void refresh(long timeNano, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > +    this.realRefresh(timeNano, getCollector(collector, coordinator),
> > coordinator);
> > +  }
> > +
> > +  private SimpleMessageCollector getCollector(MessageCollector
> collector,
> > TaskCoordinator coordinator) {
> > +    if (!(collector instanceof SimpleMessageCollector)) {
> > +      return new SimpleMessageCollector(collector, coordinator,
> > this.callback);
> > +    } else {
> > +      ((SimpleMessageCollector)
> collector).switchCallback(this.callback);
> > +      return (SimpleMessageCollector) collector;
> > +    }
> > +  }
> > +
> > +  /**
> > +   * Method to be overriden by each specific implementation class of
> > operator to handle timeout event
> > +   *
> > +   * @param timeNano The time in nanosecond when the timeout event
> > occurred
> > +   * @param collector The {@link
> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> > +   * @param coordinator The {@link
> org.apache.samza.task.TaskCoordinator}
> > in the context
> > +   * @throws Exception Throws exception if failed to refresh the results
> > +   */
> > +  protected abstract void realRefresh(long timeNano,
> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> > +      throws Exception;
> > +
> > +  /**
> > +   * Method to be overriden by each specific implementation class of
> > operator to perform relational logic operation on an input {@link
> > org.apache.samza.sql.api.data.Relation}
> > +   *
> > +   * @param rel The input relation
> > +   * @param collector The {@link
> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> > +   * @param coordinator The {@link
> org.apache.samza.task.TaskCoordinator}
> > in the context
> > +   * @throws Exception Throws exception if failed to process
> > +   */
> > +  protected abstract void realProcess(Relation rel,
> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> > +      throws Exception;
> > +
> > +  /**
> > +   * Method to be overriden by each specific implementation class of
> > operator to perform relational logic operation on an input {@link
> > org.apache.samza.sql.api.data.Tuple}
> > +   *
> > +   * @param ituple The input tuple
> > +   * @param collector The {@link
> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> > +   * @param coordinator The {@link
> org.apache.samza.task.TaskCoordinator}
> > in the context
> > +   * @throws Exception Throws exception if failed to process
> > +   */
> > +  protected abstract void realProcess(Tuple ituple,
> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> > +      throws Exception;
> > +
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> > new file mode 100644
> > index 0000000..691e543
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
> > @@ -0,0 +1,106 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +package org.apache.samza.sql.operators;
> > +
> > +import java.util.ArrayList;
> > +import java.util.List;
> > +
> > +import org.apache.samza.sql.api.data.EntityName;
> > +import org.apache.samza.sql.api.operators.OperatorSpec;
> > +
> > +
> > +/**
> > + * An abstract class that encapsulate the basic information and methods
> > that all specification of operators should implement.
> > + * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
> > + */
> > +public abstract class SimpleOperatorSpec implements OperatorSpec {
> > +  /**
> > +   * The identifier of the corresponding operator
> > +   */
> > +  private final String id;
> > +
> > +  /**
> > +   * The list of input entity names of the corresponding operator
> > +   */
> > +  private final List<EntityName> inputs = new ArrayList<EntityName>();
> > +
> > +  /**
> > +   * The list of output entity names of the corresponding operator
> > +   */
> > +  private final List<EntityName> outputs = new ArrayList<EntityName>();
> > +
> > +  /**
> > +   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one
> > output
> > +   *
> > +   * @param id Unique identifier of the {@link
> > org.apache.samza.sql.api.operators.SimpleOperator} object
> > +   * @param input The only input entity
> > +   * @param output The only output entity
> > +   */
> > +  public SimpleOperatorSpec(String id, EntityName input, EntityName
> > output) {
> > +    this.id = id;
> > +    this.inputs.add(input);
> > +    this.outputs.add(output);
> > +  }
> > +
> > +  /**
> > +   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs
> and
> > n outputs
> > +   *
> > +   * @param id Unique identifier of the {@link
> > org.apache.samza.sql.api.operators.SimpleOperator} object
> > +   * @param inputs The list of input entities
> > +   * @param output The list of output entities
> > +   */
> > +  public SimpleOperatorSpec(String id, List<EntityName> inputs,
> > EntityName output) {
> > +    this.id = id;
> > +    this.inputs.addAll(inputs);
> > +    this.outputs.add(output);
> > +  }
> > +
> > +  @Override
> > +  public String getId() {
> > +    return this.id;
> > +  }
> > +
> > +  @Override
> > +  public List<EntityName> getInputNames() {
> > +    return this.inputs;
> > +  }
> > +
> > +  @Override
> > +  public List<EntityName> getOutputNames() {
> > +    return this.outputs;
> > +  }
> > +
> > +  /**
> > +   * Method to get the first output entity
> > +   *
> > +   * @return The first output entity name
> > +   */
> > +  public EntityName getOutputName() {
> > +    return this.outputs.get(0);
> > +  }
> > +
> > +  /**
> > +   * Method to get the first input entity
> > +   *
> > +   * @return The first input entity name
> > +   */
> > +  public EntityName getInputName() {
> > +    return this.inputs.get(0);
> > +  }
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> > new file mode 100644
> > index 0000000..2d9a1db
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java
> > @@ -0,0 +1,141 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +
> > +package org.apache.samza.sql.operators;
> > +
> > +import java.util.ArrayList;
> > +import java.util.HashMap;
> > +import java.util.HashSet;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Set;
> > +
> > +import org.apache.samza.config.Config;
> > +import org.apache.samza.sql.api.data.EntityName;
> > +import org.apache.samza.sql.api.data.Relation;
> > +import org.apache.samza.sql.api.data.Tuple;
> > +import org.apache.samza.sql.api.operators.Operator;
> > +import org.apache.samza.sql.api.operators.OperatorRouter;
> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> > +import org.apache.samza.task.MessageCollector;
> > +import org.apache.samza.task.TaskContext;
> > +import org.apache.samza.task.TaskCoordinator;
> > +import org.apache.samza.task.sql.RouterMessageCollector;
> > +
> > +
> > +/**
> > + * Example implementation of {@link
> > org.apache.samza.sql.api.operators.OperatorRouter}
> > + *
> > + */
> > +public final class SimpleRouter implements OperatorRouter {
> > +  /**
> > +   * List of operators added to the {@link
> > org.apache.samza.sql.api.operators.OperatorRouter}
> > +   */
> > +  private List<SimpleOperator> operators = new
> > ArrayList<SimpleOperator>();
> > +
> > +  @SuppressWarnings("rawtypes")
> > +  /**
> > +   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list
> > of operators associated with it
> > +   */
> > +  private Map<EntityName, List> nextOps = new HashMap<EntityName,
> List>();
> > +
> > +  /**
> > +   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs
> to
> > this {@code SimpleRouter}
> > +   */
> > +  private Set<EntityName> inputEntities = new HashSet<EntityName>();
> > +
> > +  /**
> > +   * Set of entities that are not input entities to this {@code
> > SimpleRouter}
> > +   */
> > +  private Set<EntityName> outputEntities = new HashSet<EntityName>();
> > +
> > +  @SuppressWarnings("unchecked")
> > +  private void addOperator(EntityName input, SimpleOperator nextOp) {
> > +    if (nextOps.get(input) == null) {
> > +      nextOps.put(input, new ArrayList<Operator>());
> > +    }
> > +    nextOps.get(input).add(nextOp);
> > +    operators.add(nextOp);
> > +    // get the operator spec
> > +    for (EntityName output : nextOp.getSpec().getOutputNames()) {
> > +      if (inputEntities.contains(output)) {
> > +        inputEntities.remove(output);
> > +      }
> > +      outputEntities.add(output);
> > +    }
> > +    if (!outputEntities.contains(input)) {
> > +      inputEntities.add(input);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  @SuppressWarnings("unchecked")
> > +  public List<SimpleOperator> getNextOperators(EntityName entity) {
> > +    return nextOps.get(entity);
> > +  }
> > +
> > +  @Override
> > +  public void addOperator(SimpleOperator nextOp) {
> > +    List<EntityName> inputs = nextOp.getSpec().getInputNames();
> > +    for (EntityName input : inputs) {
> > +      addOperator(input, nextOp);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void init(Config config, TaskContext context) throws Exception
> {
> > +    for (SimpleOperator op : this.operators) {
> > +      op.init(config, context);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void process(Tuple ituple, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > +    MessageCollector opCollector = new RouterMessageCollector(collector,
> > coordinator, this);
> > +    for (Iterator<SimpleOperator> iter =
> > this.getNextOperators(ituple.getEntityName()).iterator();
> iter.hasNext();) {
> > +      iter.next().process(ituple, opCollector, coordinator);
> > +    }
> > +  }
> > +
> > +  @SuppressWarnings("rawtypes")
> > +  @Override
> > +  public void process(Relation deltaRelation, MessageCollector
> collector,
> > TaskCoordinator coordinator) throws Exception {
> > +    MessageCollector opCollector = new RouterMessageCollector(collector,
> > coordinator, this);
> > +    for (Iterator<SimpleOperator> iter =
> > this.getNextOperators(deltaRelation.getName()).iterator();
> iter.hasNext();)
> > {
> > +      iter.next().process(deltaRelation, opCollector, coordinator);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void refresh(long nanoSec, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > +    MessageCollector opCollector = new RouterMessageCollector(collector,
> > coordinator, this);
> > +    for (EntityName entity : inputEntities) {
> > +      for (Iterator<SimpleOperator> iter =
> > this.getNextOperators(entity).iterator(); iter.hasNext();) {
> > +        iter.next().refresh(nanoSec, opCollector, coordinator);
> > +      }
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public Iterator<SimpleOperator> iterator() {
> > +    return this.operators.iterator();
> > +  }
> > +
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> > deleted file mode 100644
> > index c3d2266..0000000
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
> > +++ /dev/null
> > @@ -1,50 +0,0 @@
> > -/*
> > - * Licensed to the Apache Software Foundation (ASF) under one
> > - * or more contributor license agreements.  See the NOTICE file
> > - * distributed with this work for additional information
> > - * regarding copyright ownership.  The ASF licenses this file
> > - * to you under the Apache License, Version 2.0 (the
> > - * "License"); you may not use this file except in compliance
> > - * with the License.  You may obtain a copy of the License at
> > - *
> > - *   http://www.apache.org/licenses/LICENSE-2.0
> > - *
> > - * Unless required by applicable law or agreed to in writing,
> > - * software distributed under the License is distributed on an
> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > - * KIND, either express or implied.  See the License for the
> > - * specific language governing permissions and limitations
> > - * under the License.
> > - */
> > -package org.apache.samza.sql.operators.factory;
> > -
> > -import org.apache.samza.sql.api.data.Relation;
> > -import org.apache.samza.sql.api.data.Tuple;
> > -import org.apache.samza.sql.api.operators.OperatorCallback;
> > -import org.apache.samza.task.MessageCollector;
> > -import org.apache.samza.task.TaskCoordinator;
> > -
> > -
> > -public final class NoopOperatorCallback implements OperatorCallback {
> > -
> > -  @Override
> > -  public Tuple beforeProcess(Tuple tuple, MessageCollector collector,
> > TaskCoordinator coordinator) {
> > -    return tuple;
> > -  }
> > -
> > -  @Override
> > -  public Relation beforeProcess(Relation rel, MessageCollector
> collector,
> > TaskCoordinator coordinator) {
> > -    return rel;
> > -  }
> > -
> > -  @Override
> > -  public Tuple afterProcess(Tuple tuple, MessageCollector collector,
> > TaskCoordinator coordinator) {
> > -    return tuple;
> > -  }
> > -
> > -  @Override
> > -  public Relation afterProcess(Relation rel, MessageCollector collector,
> > TaskCoordinator coordinator) {
> > -    return rel;
> > -  }
> > -
> > -}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> > deleted file mode 100644
> > index e66451f..0000000
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
> > +++ /dev/null
> > @@ -1,136 +0,0 @@
> > -/*
> > - * Licensed to the Apache Software Foundation (ASF) under one
> > - * or more contributor license agreements.  See the NOTICE file
> > - * distributed with this work for additional information
> > - * regarding copyright ownership.  The ASF licenses this file
> > - * to you under the Apache License, Version 2.0 (the
> > - * "License"); you may not use this file except in compliance
> > - * with the License.  You may obtain a copy of the License at
> > - *
> > - *   http://www.apache.org/licenses/LICENSE-2.0
> > - *
> > - * Unless required by applicable law or agreed to in writing,
> > - * software distributed under the License is distributed on an
> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > - * KIND, either express or implied.  See the License for the
> > - * specific language governing permissions and limitations
> > - * under the License.
> > - */
> > -
> > -package org.apache.samza.sql.operators.factory;
> > -
> > -import org.apache.samza.sql.api.data.Relation;
> > -import org.apache.samza.sql.api.data.Tuple;
> > -import org.apache.samza.sql.api.operators.OperatorCallback;
> > -import org.apache.samza.sql.api.operators.OperatorSpec;
> > -import org.apache.samza.sql.api.operators.SimpleOperator;
> > -import org.apache.samza.task.MessageCollector;
> > -import org.apache.samza.task.TaskCoordinator;
> > -import org.apache.samza.task.sql.SimpleMessageCollector;
> > -
> > -
> > -/**
> > - * An abstract class that encapsulate the basic information and methods
> > that all operator classes should implement.
> > - * It implements the interface {@link
> > org.apache.samza.sql.api.operators.SimpleOperator}
> > - *
> > - */
> > -public abstract class SimpleOperatorImpl implements SimpleOperator {
> > -  /**
> > -   * The specification of this operator
> > -   */
> > -  private final OperatorSpec spec;
> > -
> > -  /**
> > -   * The callback function
> > -   */
> > -  private final OperatorCallback callback;
> > -
> > -  /**
> > -   * Ctor of {@code SimpleOperatorImpl} class
> > -   *
> > -   * @param spec The specification of this operator
> > -   */
> > -  public SimpleOperatorImpl(OperatorSpec spec) {
> > -    this(spec, new NoopOperatorCallback());
> > -  }
> > -
> > -  public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback
> callback)
> > {
> > -    this.spec = spec;
> > -    this.callback = callback;
> > -  }
> > -
> > -  @Override
> > -  public OperatorSpec getSpec() {
> > -    return this.spec;
> > -  }
> > -
> > -  /**
> > -   * This method is made final s.t. the sequence of invocations between
> > {@link
> >
> org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation,
> > MessageCollector, TaskCoordinator)}
> > -   * and real processing of the input relation is fixed.
> > -   */
> > -  @Override
> > -  final public void process(Relation deltaRelation, MessageCollector
> > collector, TaskCoordinator coordinator)
> > -      throws Exception {
> > -    Relation rel = this.callback.beforeProcess(deltaRelation, collector,
> > coordinator);
> > -    if (rel == null) {
> > -      return;
> > -    }
> > -    this.realProcess(rel, getCollector(collector, coordinator),
> > coordinator);
> > -  }
> > -
> > -  /**
> > -   * This method is made final s.t. the sequence of invocations between
> > {@link
> > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple,
> > MessageCollector, TaskCoordinator)}
> > -   * and real processing of the input tuple is fixed.
> > -   */
> > -  @Override
> > -  final public void process(Tuple tuple, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > -    Tuple ituple = this.callback.beforeProcess(tuple, collector,
> > coordinator);
> > -    if (ituple == null) {
> > -      return;
> > -    }
> > -    this.realProcess(ituple, getCollector(collector, coordinator),
> > coordinator);
> > -  }
> > -
> > -  /**
> > -   * This method is made final s.t. we enforce the invocation of {@code
> > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)}
> before
> > doing anything futher
> > -   */
> > -  @Override
> > -  final public void refresh(long timeNano, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > -    this.realRefresh(timeNano, getCollector(collector, coordinator),
> > coordinator);
> > -  }
> > -
> > -  private SimpleMessageCollector getCollector(MessageCollector
> collector,
> > TaskCoordinator coordinator) {
> > -    if (!(collector instanceof SimpleMessageCollector)) {
> > -      return new SimpleMessageCollector(collector, coordinator,
> > this.callback);
> > -    } else {
> > -      ((SimpleMessageCollector)
> > collector).switchOperatorCallback(this.callback);
> > -      return (SimpleMessageCollector) collector;
> > -    }
> > -  }
> > -
> > -  /**
> > -   * Method to be overriden by each specific implementation class of
> > operator to handle timeout event
> > -   *
> > -   * @param timeNano The time in nanosecond when the timeout event
> > occurred
> > -   * @param collector The {@link
> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> > -   * @param coordinator The {@link
> org.apache.samza.task.TaskCoordinator}
> > in the context
> > -   * @throws Exception Throws exception if failed to refresh the results
> > -   */
> > -  protected abstract void realRefresh(long timeNano,
> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> > -      throws Exception;
> > -
> > -  /**
> > -   * Method to be overriden by each specific implementation class of
> > operator to perform relational logic operation on an input {@link
> > org.apache.samza.sql.api.data.Relation}
> > -   *
> > -   * @param rel The input relation
> > -   * @param collector The {@link
> > org.apache.samza.task.sql.SimpleMessageCollector} in the context
> > -   * @param coordinator The {@link
> org.apache.samza.task.TaskCoordinator}
> > in the context
> > -   * @throws Exception
> > -   */
> > -  protected abstract void realProcess(Relation rel,
> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> > -      throws Exception;
> > -
> > -  protected abstract void realProcess(Tuple ituple,
> > SimpleMessageCollector collector, TaskCoordinator coordinator)
> > -      throws Exception;
> > -
> > -}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> > deleted file mode 100644
> > index 56753b6..0000000
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
> > +++ /dev/null
> > @@ -1,106 +0,0 @@
> > -/*
> > - * Licensed to the Apache Software Foundation (ASF) under one
> > - * or more contributor license agreements.  See the NOTICE file
> > - * distributed with this work for additional information
> > - * regarding copyright ownership.  The ASF licenses this file
> > - * to you under the Apache License, Version 2.0 (the
> > - * "License"); you may not use this file except in compliance
> > - * with the License.  You may obtain a copy of the License at
> > - *
> > - *   http://www.apache.org/licenses/LICENSE-2.0
> > - *
> > - * Unless required by applicable law or agreed to in writing,
> > - * software distributed under the License is distributed on an
> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > - * KIND, either express or implied.  See the License for the
> > - * specific language governing permissions and limitations
> > - * under the License.
> > - */
> > -package org.apache.samza.sql.operators.factory;
> > -
> > -import java.util.ArrayList;
> > -import java.util.List;
> > -
> > -import org.apache.samza.sql.api.data.EntityName;
> > -import org.apache.samza.sql.api.operators.OperatorSpec;
> > -
> > -
> > -/**
> > - * An abstract class that encapsulate the basic information and methods
> > that all specification of operators should implement.
> > - * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec}
> > - */
> > -public abstract class SimpleOperatorSpec implements OperatorSpec {
> > -  /**
> > -   * The identifier of the corresponding operator
> > -   */
> > -  private final String id;
> > -
> > -  /**
> > -   * The list of input entity names of the corresponding operator
> > -   */
> > -  private final List<EntityName> inputs = new ArrayList<EntityName>();
> > -
> > -  /**
> > -   * The list of output entity names of the corresponding operator
> > -   */
> > -  private final List<EntityName> outputs = new ArrayList<EntityName>();
> > -
> > -  /**
> > -   * Ctor of the {@code SimpleOperatorSpec} for simple {@link
> > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one
> > output
> > -   *
> > -   * @param id Unique identifier of the {@link
> > org.apache.samza.sql.api.operators.SimpleOperator} object
> > -   * @param input The only input entity
> > -   * @param output The only output entity
> > -   */
> > -  public SimpleOperatorSpec(String id, EntityName input, EntityName
> > output) {
> > -    this.id = id;
> > -    this.inputs.add(input);
> > -    this.outputs.add(output);
> > -  }
> > -
> > -  /**
> > -   * Ctor of {@code SimpleOperatorSpec} with general format: m inputs
> and
> > n outputs
> > -   *
> > -   * @param id Unique identifier of the {@link
> > org.apache.samza.sql.api.operators.SimpleOperator} object
> > -   * @param inputs The list of input entities
> > -   * @param output The list of output entities
> > -   */
> > -  public SimpleOperatorSpec(String id, List<EntityName> inputs,
> > EntityName output) {
> > -    this.id = id;
> > -    this.inputs.addAll(inputs);
> > -    this.outputs.add(output);
> > -  }
> > -
> > -  @Override
> > -  public String getId() {
> > -    return this.id;
> > -  }
> > -
> > -  @Override
> > -  public List<EntityName> getInputNames() {
> > -    return this.inputs;
> > -  }
> > -
> > -  @Override
> > -  public List<EntityName> getOutputNames() {
> > -    return this.outputs;
> > -  }
> > -
> > -  /**
> > -   * Method to get the first output entity
> > -   *
> > -   * @return The first output entity name
> > -   */
> > -  public EntityName getOutputName() {
> > -    return this.outputs.get(0);
> > -  }
> > -
> > -  /**
> > -   * Method to get the first input entity
> > -   *
> > -   * @return The first input entity name
> > -   */
> > -  public EntityName getInputName() {
> > -    return this.inputs.get(0);
> > -  }
> > -}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> > deleted file mode 100644
> > index e570897..0000000
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
> > +++ /dev/null
> > @@ -1,136 +0,0 @@
> > -/*
> > - * Licensed to the Apache Software Foundation (ASF) under one
> > - * or more contributor license agreements.  See the NOTICE file
> > - * distributed with this work for additional information
> > - * regarding copyright ownership.  The ASF licenses this file
> > - * to you under the Apache License, Version 2.0 (the
> > - * "License"); you may not use this file except in compliance
> > - * with the License.  You may obtain a copy of the License at
> > - *
> > - *   http://www.apache.org/licenses/LICENSE-2.0
> > - *
> > - * Unless required by applicable law or agreed to in writing,
> > - * software distributed under the License is distributed on an
> > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > - * KIND, either express or implied.  See the License for the
> > - * specific language governing permissions and limitations
> > - * under the License.
> > - */
> > -
> > -package org.apache.samza.sql.operators.factory;
> > -
> > -import java.util.ArrayList;
> > -import java.util.HashMap;
> > -import java.util.HashSet;
> > -import java.util.Iterator;
> > -import java.util.List;
> > -import java.util.Map;
> > -import java.util.Set;
> > -
> > -import org.apache.samza.config.Config;
> > -import org.apache.samza.sql.api.data.EntityName;
> > -import org.apache.samza.sql.api.data.Relation;
> > -import org.apache.samza.sql.api.data.Tuple;
> > -import org.apache.samza.sql.api.operators.Operator;
> > -import org.apache.samza.sql.api.operators.OperatorRouter;
> > -import org.apache.samza.sql.api.operators.SimpleOperator;
> > -import org.apache.samza.task.MessageCollector;
> > -import org.apache.samza.task.TaskContext;
> > -import org.apache.samza.task.TaskCoordinator;
> > -import org.apache.samza.task.sql.RouterMessageCollector;
> > -
> > -
> > -/**
> > - * Example implementation of {@link
> > org.apache.samza.sql.api.operators.OperatorRouter}
> > - *
> > - */
> > -public final class SimpleRouter implements OperatorRouter {
> > -  /**
> > -   * List of operators added to the {@link
> > org.apache.samza.sql.api.operators.OperatorRouter}
> > -   */
> > -  private List<SimpleOperator> operators = new
> > ArrayList<SimpleOperator>();
> > -
> > -  @SuppressWarnings("rawtypes")
> > -  /**
> > -   * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list
> > of operators associated with it
> > -   */
> > -  private Map<EntityName, List> nextOps = new HashMap<EntityName,
> List>();
> > -
> > -  /**
> > -   * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs
> to
> > this {@code SimpleRouter}
> > -   */
> > -  private Set<EntityName> inputEntities = new HashSet<EntityName>();
> > -
> > -  /**
> > -   * Set of entities that are not input entities to this {@code
> > SimpleRouter}
> > -   */
> > -  private Set<EntityName> outputEntities = new HashSet<EntityName>();
> > -
> > -  @SuppressWarnings("unchecked")
> > -  private void addOperator(EntityName input, SimpleOperator nextOp) {
> > -    if (nextOps.get(input) == null) {
> > -      nextOps.put(input, new ArrayList<Operator>());
> > -    }
> > -    nextOps.get(input).add(nextOp);
> > -    operators.add(nextOp);
> > -    // get the operator spec
> > -    for (EntityName output : nextOp.getSpec().getOutputNames()) {
> > -      if (inputEntities.contains(output)) {
> > -        inputEntities.remove(output);
> > -      }
> > -      outputEntities.add(output);
> > -    }
> > -    if (!outputEntities.contains(input)) {
> > -      inputEntities.add(input);
> > -    }
> > -  }
> > -
> > -  @Override
> > -  @SuppressWarnings("unchecked")
> > -  public List<SimpleOperator> getNextOperators(EntityName entity) {
> > -    return nextOps.get(entity);
> > -  }
> > -
> > -  @Override
> > -  public void addOperator(SimpleOperator nextOp) {
> > -    List<EntityName> inputs = nextOp.getSpec().getInputNames();
> > -    for (EntityName input : inputs) {
> > -      addOperator(input, nextOp);
> > -    }
> > -  }
> > -
> > -  @Override
> > -  public void init(Config config, TaskContext context) throws Exception
> {
> > -    for (SimpleOperator op : this.operators) {
> > -      op.init(config, context);
> > -    }
> > -  }
> > -
> > -  @Override
> > -  public void process(Tuple ituple, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > -    MessageCollector opCollector = new RouterMessageCollector(collector,
> > coordinator, this);
> > -    for (Iterator<SimpleOperator> iter =
> > this.getNextOperators(ituple.getEntityName()).iterator();
> iter.hasNext();) {
> > -      iter.next().process(ituple, opCollector, coordinator);
> > -    }
> > -  }
> > -
> > -  @SuppressWarnings("rawtypes")
> > -  @Override
> > -  public void process(Relation deltaRelation, MessageCollector
> collector,
> > TaskCoordinator coordinator) throws Exception {
> > -    MessageCollector opCollector = new RouterMessageCollector(collector,
> > coordinator, this);
> > -    for (Iterator<SimpleOperator> iter =
> > this.getNextOperators(deltaRelation.getName()).iterator();
> iter.hasNext();)
> > {
> > -      iter.next().process(deltaRelation, opCollector, coordinator);
> > -    }
> > -  }
> > -
> > -  @Override
> > -  public void refresh(long nanoSec, MessageCollector collector,
> > TaskCoordinator coordinator) throws Exception {
> > -    MessageCollector opCollector = new RouterMessageCollector(collector,
> > coordinator, this);
> > -    for (EntityName entity : inputEntities) {
> > -      for (Iterator<SimpleOperator> iter =
> > this.getNextOperators(entity).iterator(); iter.hasNext();) {
> > -        iter.next().refresh(nanoSec, opCollector, coordinator);
> > -      }
> > -    }
> > -  }
> > -
> > -}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> > new file mode 100644
> > index 0000000..62b19fc
> > --- /dev/null
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
> > @@ -0,0 +1,284 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one
> > + * or more contributor license agreements.  See the NOTICE file
> > + * distributed with this work for additional information
> > + * regarding copyright ownership.  The ASF licenses this file
> > + * to you under the Apache License, Version 2.0 (the
> > + * "License"); you may not use this file except in compliance
> > + * with the License.  You may obtain a copy of the License at
> > + *
> > + *   http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing,
> > + * software distributed under the License is distributed on an
> > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > + * KIND, either express or implied.  See the License for the
> > + * specific language governing permissions and limitations
> > + * under the License.
> > + */
> > +package org.apache.samza.sql.operators.factory;
> > +
> > +import java.util.HashMap;
> > +import java.util.HashSet;
> > +import java.util.Iterator;
> > +import java.util.List;
> > +import java.util.Map;
> > +import java.util.Set;
> > +
> > +import org.apache.samza.sql.api.data.EntityName;
> > +import org.apache.samza.sql.api.operators.OperatorRouter;
> > +import org.apache.samza.sql.api.operators.OperatorSink;
> > +import org.apache.samza.sql.api.operators.OperatorSource;
> > +import org.apache.samza.sql.api.operators.OperatorSpec;
> > +import org.apache.samza.sql.api.operators.SimpleOperator;
> > +import org.apache.samza.sql.api.operators.SqlOperatorFactory;
> > +import org.apache.samza.sql.operators.OperatorTopology;
> > +import org.apache.samza.sql.operators.SimpleRouter;
> > +
> > +
> > +/**
> > + * This class implements a builder to allow user to create the operators
> > and connect them in a topology altogether.
> > + */
> > +public class TopologyBuilder {
> > +
> > +  /**
> > +   * Internal {@link org.apache.samza.sql.api.operators.OperatorRouter}
> > object to retain the topology being created
> > +   */
> > +  private SimpleRouter router;
> > +
> > +  /**
> > +   * The {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
> > object used to create operators connected in the topology
> > +   */
> > +  private final SqlOperatorFactory factory;
> > +
> > +  /**
> > +   * The map of unbound inputs, the value is set(input_operators)
> > +   */
> > +  private Map<EntityName, Set<OperatorSpec>> unboundInputs = new
> > HashMap<EntityName, Set<OperatorSpec>>();
> > +
> > +  /**
> > +   * The map of unbound outputs, the value is the operator generating
> the
> > output
> > +   */
> > +  private Map<EntityName, OperatorSpec> unboundOutputs = new
> > HashMap<EntityName, OperatorSpec>();
> > +
> > +  /**
> > +   * The set of entities that are intermediate entities between
> operators
> > +   */
> > +  private Set<EntityName> interStreams = new HashSet<EntityName>();
> > +
> > +  /**
> > +   * The current operator that may have unbound input or output
> > +   */
> > +  private SimpleOperator currentOp = null;
> > +
> > +  /**
> > +   * Private constructor of {@code TopologyBuilder}
> > +   *
> > +   * @param factory The {@link
> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create
> operators
> > +   */
> > +  private TopologyBuilder(SqlOperatorFactory factory) {
> > +    this.router = new SimpleRouter();
> > +    this.factory = factory;
> > +  }
> > +
> > +  /**
> > +   * Static method to create this {@code TopologyBuilder} w/ a
> customized
> > {@link org.apache.samza.sql.api.operators.SqlOperatorFactory}
> > +   *
> > +   * @param factory The {@link
> > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create
> operators
> > +   * @return The {@code TopologyBuilder} object
> > +   */
> > +  public static TopologyBuilder create(SqlOperatorFactory factory) {
> > +    return new TopologyBuilder(factory);
> > +  }
> > +
> > +  /**
> > +   * Static method to create this {@code TopologyBuilder}
> > +   *
> > +   * @return The {@code TopologyBuilder} object
> > +   */
> > +  public static TopologyBuilder create() {
> > +    return new TopologyBuilder(new SimpleOperatorFactoryImpl());
> > +  }
> > +
> > +  /**
> > +   * Public method to create the next operator and attach it to the
> > output of the current operator
> > +   *
> > +   * @param spec The {@link
> > org.apache.samza.sql.api.operators.OperatorSpec} for the next operator
> > +   * @return The updated {@code TopologyBuilder} object
> > +   */
> > +  public TopologyBuilder operator(OperatorSpec spec) {
> > +    // check whether it is valid to connect a new operator to the
> current
> > operator's output
> > +    SimpleOperator nextOp = this.factory.getOperator(spec);
> > +    return this.operator(nextOp);
> > +  }
> > +
> > +  /**
> > +   * Public method to create the next operator and attach it to the
> > output of the current operator
> > +   *
> > +   * @param op The {@link
> > org.apache.samza.sql.api.operators.SimpleOperator}
> > +   * @return The updated {@code TopologyBuilder} object
> > +   */
> > +  public TopologyBuilder operator(SimpleOperator op) {
> > +    // check whether it is valid to connect a new operator to the
> current
> > operator's output
> > +    canAddOperator(op);
> > +    this.addOperator(op);
> > +    // advance the current operator position
> > +    this.currentOp = op;
> > +    return this;
> > +  }
> > +
> > +  /**
> > +   * Public method to create a stream object that will be the source to
> > other operators
> > +   *
> > +   * @return The {@link
> > org.apache.samza.sql.api.operators.OperatorSource} that can be the source
> > to other operators
> > +   */
> > +  public OperatorSource stream() {
> > +    canCreateSource();
> > +    return new
> > OperatorTopology(this.unboundOutputs.keySet().iterator().next(),
> > this.router);
> > +  }
> > +
> > +  /**
> > +   * Public method to create a sink object that can take input stream
> > from other operators
> > +   *
> > +   * @return The {@link org.apache.samza.sql.api.operators.OperatorSink}
> > that can be the downstream of other operators
> > +   */
> > +  public OperatorSink sink() {
> > +    canCreateSink();
> > +    return new
> > OperatorTopology(this.unboundInputs.keySet().iterator().next(),
> > this.router);
> > +  }
> > +
> > +  /**
> > +   * Public method to bind the input of the current operator w/ the
> > {@link org.apache.samza.sql.api.operators.OperatorSource} object
> > +   *
> > +   * @param srcStream The {@link
> > org.apache.samza.sql.api.operators.OperatorSource} that the current
> > operator is going to be bound to
> > +   * @return The updated {@code TopologyBuilder} object
> > +   */
> > +  public TopologyBuilder bind(OperatorSource srcStream) {
> > +    EntityName streamName = srcStream.getName();
> > +    if (this.unboundInputs.containsKey(streamName)) {
> > +      this.unboundInputs.remove(streamName);
> > +      this.interStreams.add(streamName);
> > +    } else {
> > +      // no input operator is waiting for the output from the srcStream
> > +      throw new IllegalArgumentException("No operator input can be bound
> > to the input stream " + streamName);
> > +    }
> > +    // add all operators in srcStream to this topology
> > +    for (Iterator<SimpleOperator> iter = srcStream.opIterator();
> > iter.hasNext();) {
> > +      this.addOperator(iter.next());
> > +    }
> > +    return this;
> > +  }
> > +
> > +  /**
> > +   * Public method to attach a {@link
> > org.apache.samza.sql.api.operators.OperatorSink} object to the output of
> > the current operator
> > +   *
> > +   * @param nextSink The {@link
> > org.apache.samza.sql.api.operators.OperatorSink} to be attached to the
> > current operator's output
> > +   * @return The updated {@code TopologyBuilder} object
> > +   */
> > +  public TopologyBuilder attach(OperatorSink nextSink) {
> > +    EntityName streamName = nextSink.getName();
> > +    if (this.unboundOutputs.containsKey(streamName)) {
> > +      this.unboundOutputs.remove(streamName);
> > +      this.interStreams.add(streamName);
> > +    } else {
> > +      // no unbound output to attach to
> > +      throw new IllegalArgumentException("No operator output found to
> > attach the sink " + streamName);
> > +    }
> > +    // add all operators in nextSink to the router
> > +    for (Iterator<SimpleOperator> iter = nextSink.opIterator();
> > iter.hasNext();) {
> > +      this.addOperator(iter.next());
> > +    }
> > +    return this;
> > +  }
> > +
> > +  /**
> > +   * Public method to finalize the topology that should have all input
> > and output bound to system input and output
> > +   *
> > +   * @return The finalized {@link
> > org.apache.samza.sql.api.operators.OperatorRouter} object
> > +   */
> > +  public OperatorRouter build() {
> > +    canClose();
> > +    return router;
> > +  }
> > +
> > +  private TopologyBuilder addOperator(SimpleOperator nextOp) {
> > +    // if input is not in the unboundOutputs and interStreams, input is
> > unbound
> > +    for (EntityName in : nextOp.getSpec().getInputNames()) {
> > +      if (this.unboundOutputs.containsKey(in)) {
> > +        this.unboundOutputs.remove(in);
> > +        this.interStreams.add(in);
> > +      }
> > +      if (!this.interStreams.contains(in) && !in.isSystemEntity()) {
> > +        if (!this.unboundInputs.containsKey(in)) {
> > +          this.unboundInputs.put(in, new HashSet<OperatorSpec>());
> > +        }
> > +        this.unboundInputs.get(in).add(nextOp.getSpec());
> > +      }
> > +    }
> > +    // if output is not in the unboundInputs and interStreams, output is
> > unbound
> > +    for (EntityName out : nextOp.getSpec().getOutputNames()) {
> > +      if (this.unboundInputs.containsKey(out)) {
> > +        this.unboundInputs.remove(out);
> > +        this.interStreams.add(out);
> > +      }
> > +      if (!this.interStreams.contains(out) && !out.isSystemEntity()) {
> > +        this.unboundOutputs.put(out, nextOp.getSpec());
> > +      }
> > +    }
> > +    try {
> > +      this.router.addOperator(nextOp);
> > +    } catch (Exception e) {
> > +      throw new RuntimeException("Failed to add operator " +
> > nextOp.getSpec().getId() + " to the topology.", e);
> > +    }
> > +    return this;
> > +  }
> > +
> > +  private void canCreateSource() {
> > +    if (this.unboundInputs.size() > 0) {
> > +      throw new IllegalStateException("Can't create stream when there
> are
> > unbounded input streams in the topology");
> > +    }
> > +    if (this.unboundOutputs.size() != 1) {
> > +      throw new IllegalStateException(
> > +          "Can't create stream when the number of unbounded outputs is
> > not 1 in the topology");
> > +    }
> > +  }
> > +
> > +  private void canCreateSink() {
> > +    if (this.unboundOutputs.size() > 0) {
> > +      throw new IllegalStateException("Can't create sink when there are
> > unbounded output streams in the topology");
> > +    }
> > +    if (this.unboundInputs.size() != 1) {
> > +      throw new IllegalStateException(
> > +          "Can't create sink when the number of unbounded input streams
> > is not 1 in the topology");
> > +    }
> > +  }
> > +
> > +  private void canAddOperator(SimpleOperator op) {
> > +    if (this.currentOp == null) {
> > +      return;
> > +    }
> > +    for (EntityName name : this.currentOp.getSpec().getInputNames()) {
> > +      if (this.unboundInputs.containsKey(name)) {
> > +        throw new IllegalArgumentException("There are unbound input " +
> > name + " to the current operator "
> > +            + this.currentOp.getSpec().getId() + ". Create a sink or
> call
> > bind instead");
> > +      }
> > +    }
> > +    List<EntityName> nextInputs = op.getSpec().getInputNames();
> > +    for (EntityName name : this.currentOp.getSpec().getOutputNames()) {
> > +      if (!nextInputs.contains(name) &&
> > this.unboundOutputs.containsKey(name)) {
> > +        // the current operator's output is not in the next operator's
> > input list
> > +        throw new IllegalArgumentException("There are unbound output " +
> > name + " from the current operator "
> > +            + this.currentOp.getSpec().getId()
> > +            + " that are not included in the next operator's inputs.
> > Create a stream or call attach instead");
> > +      }
> > +    }
> > +  }
> > +
> > +  private void canClose() {
> > +    if (!this.unboundInputs.isEmpty() ||
> !this.unboundOutputs.isEmpty()) {
> > +      throw new IllegalStateException(
> > +          "There are input/output streams in the topology that are not
> > bounded. Can't build the topology yet.");
> > +    }
> > +  }
> > +
> > +}
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> > index 2854aeb..7f5b990 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
> > @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation;
> >  import org.apache.samza.sql.api.data.Stream;
> >  import org.apache.samza.sql.api.data.Tuple;
> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> >  import org.apache.samza.sql.window.storage.OrderedStoreKey;
> >  import org.apache.samza.storage.kv.Entry;
> > @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext;
> >  import org.apache.samza.task.TaskCoordinator;
> >  import org.apache.samza.task.sql.SimpleMessageCollector;
> >
> > -
> >  /**
> >   * This class implements a simple stream-to-stream join
> >   */
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> > index cc0aca0..eecff7e 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
> > @@ -19,10 +19,11 @@
> >
> >  package org.apache.samza.sql.operators.join;
> >
> > +import java.util.ArrayList;
> >  import java.util.List;
> >
> >  import org.apache.samza.sql.api.data.EntityName;
> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
> >
> >
> >  /**
> > @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends
> > SimpleOperatorSpec {
> >      // TODO Auto-generated constructor stub
> >    }
> >
> > +  @SuppressWarnings("serial")
> > +  public StreamStreamJoinSpec(String id, List<String> inputRelations,
> > String output, List<String> joinKeys) {
> > +    super(id, new ArrayList<EntityName>() {
> > +      {
> > +        for (String input : inputRelations) {
> > +          add(EntityName.getStreamName(input));
> > +        }
> > +      }
> > +    }, EntityName.getStreamName(output));
> > +    // TODO Auto-generated constructor stub
> > +  }
> > +
> >  }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> > index b93d789..0cba39a 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
> > @@ -23,7 +23,7 @@ import org.apache.samza.config.Config;
> >  import org.apache.samza.sql.api.data.Relation;
> >  import org.apache.samza.sql.api.data.Tuple;
> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
> >  import org.apache.samza.storage.kv.Entry;
> >  import org.apache.samza.storage.kv.KeyValueIterator;
> >  import org.apache.samza.system.OutgoingMessageEnvelope;
> > @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext;
> >  import org.apache.samza.task.TaskCoordinator;
> >  import org.apache.samza.task.sql.SimpleMessageCollector;
> >
> > -
> >  /**
> >   * This is an example build-in operator that performs a simple stream
> > re-partition operation.
> >   *
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> > index c47eed9..e494bff 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition;
> >
> >  import org.apache.samza.sql.api.data.EntityName;
> >  import org.apache.samza.sql.api.operators.OperatorSpec;
> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
> >  import org.apache.samza.system.SystemStream;
> >
> >
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> > index d81cc93..a9a83b5 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
> > @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName;
> >  import org.apache.samza.sql.api.data.Relation;
> >  import org.apache.samza.sql.api.data.Tuple;
> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl;
> > +import org.apache.samza.sql.operators.SimpleOperatorImpl;
> >  import org.apache.samza.storage.kv.KeyValueIterator;
> >  import org.apache.samza.task.TaskContext;
> >  import org.apache.samza.task.TaskCoordinator;
> >  import org.apache.samza.task.sql.SimpleMessageCollector;
> >
> > -
> >  /**
> >   * This class defines an example build-in operator for a fixed size
> > window operator that converts a stream to a relation
> >   *
> > @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends
> > SimpleOperatorImpl {
> >     * @param lengthSec The window size in seconds
> >     * @param input The input stream name
> >     * @param output The output relation name
> > +   * @param callback The user callback object
> >     */
> >    public BoundedTimeWindow(String wndId, int lengthSec, String input,
> > String output, OperatorCallback callback) {
> >      super(new WindowSpec(wndId, EntityName.getStreamName(input),
> > EntityName.getStreamName(output), lengthSec), callback);
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> > index eec32ea..6c4eba8 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
> > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window;
> >
> >  import org.apache.samza.sql.api.data.EntityName;
> >  import org.apache.samza.sql.api.operators.OperatorSpec;
> > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec;
> > +import org.apache.samza.sql.operators.SimpleOperatorSpec;
> >
> >
> >  /**
> > @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec
> > implements OperatorSpec {
> >      this.wndSizeSec = lengthSec;
> >    }
> >
> > +  public WindowSpec(String id, int wndSize, String input) {
> > +    super(id, EntityName.getStreamName(input), null);
> > +    this.wndSizeSec = wndSize;
> > +  }
> > +
> >    /**
> >     * Method to get the window state relation name
> >     *
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> >
> b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> > index b29838a..6950f67 100644
> > ---
> >
> a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> > +++
> >
> b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
> > @@ -22,7 +22,7 @@ package org.apache.samza.task.sql;
> >  import org.apache.samza.sql.api.data.Relation;
> >  import org.apache.samza.sql.api.data.Tuple;
> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> > -import org.apache.samza.sql.operators.factory.NoopOperatorCallback;
> > +import org.apache.samza.sql.operators.NoopOperatorCallback;
> >  import org.apache.samza.storage.kv.Entry;
> >  import org.apache.samza.storage.kv.KeyValueIterator;
> >  import org.apache.samza.system.OutgoingMessageEnvelope;
> > @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements
> > MessageCollector {
> >     * @param coordinator The {@link
> org.apache.samza.task.TaskCoordinator}
> > in the context
> >     */
> >    public SimpleMessageCollector(MessageCollector collector,
> > TaskCoordinator coordinator) {
> > -    this.collector = collector;
> > -    this.coordinator = coordinator;
> > +    this(collector, coordinator, new NoopOperatorCallback());
> >    }
> >
> >    /**
> >     * This method swaps the {@code callback} with the new one
> >     *
> > -   * <p> This method allows the {@link
> > org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the
> > collector
> > -   * is passed down into the next operator's context. Hence, under the
> > new operator's context, the correct {@link
> >
> org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation,
> > MessageCollector, TaskCoordinator)},
> > -   * and {@link
> > org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple,
> > MessageCollector, TaskCoordinator)} can be invoked
> > +   * <p> This method allows the {@link
> > org.apache.samza.sql.api.operators.OperatorCallback} to be swapped when
> the
> > collector
> > +   * is passed down into the next operator's context. Hence, under the
> > new operator's context, the correct callback functions can be invoked
> >     *
> >     * @param callback The new {@link
> > org.apache.samza.sql.api.operators.OperatorCallback} to be set
> >     */
> > -  public void switchOperatorCallback(OperatorCallback callback) {
> > -    this.callback = callback;
> > +  public void switchCallback(OperatorCallback callback) {
> > +    if (callback == null) {
> > +      this.callback = new NoopOperatorCallback();
> > +    } else {
> > +      this.callback = callback;
> > +    }
> > +  }
> > +
> > +  /**
> > +   * Method is declared to be final s.t. we enforce that the callback
> > functions are called first
> > +   */
> > +  @Override
> > +  final public void send(OutgoingMessageEnvelope envelope) {
> > +    this.collector.send(envelope);
> >    }
> >
> >    /**
> >     * Method is declared to be final s.t. we enforce that the callback
> > functions are called first
> > +   *
> > +   * @param deltaRelation The relation to be sent out
> > +   * @throws Exception Throws exception if failed to send
> >     */
> >    final public void send(Relation deltaRelation) throws Exception {
> >      Relation rel = this.callback.afterProcess(deltaRelation, collector,
> > coordinator);
> > @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements
> > MessageCollector {
> >
> >    /**
> >     * Method is declared to be final s.t. we enforce that the callback
> > functions are called first
> > +   *
> > +   * @param tuple The tuple to be sent out
> > +   * @throws Exception Throws exception if failed to send
> >     */
> >    final public void send(Tuple tuple) throws Exception {
> >      Tuple otuple = this.callback.afterProcess(tuple, collector,
> > coordinator);
> > @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements
> > MessageCollector {
> >    protected void realSend(Tuple tuple) throws Exception {
> >      this.collector.send((OutgoingMessageEnvelope) tuple.getMessage());
> >    }
> > -
> > -  @Override
> > -  public void send(OutgoingMessageEnvelope envelope) {
> > -    this.collector.send(envelope);
> > -  }
> >  }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> >
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> > index 20dc701..7370af6 100644
> > ---
> >
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> > +++
> >
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
> > @@ -22,6 +22,7 @@ package org.apache.samza.task.sql;
> >  import org.apache.samza.config.Config;
> >  import org.apache.samza.sql.api.data.Relation;
> >  import org.apache.samza.sql.api.data.Tuple;
> > +import org.apache.samza.sql.api.operators.Operator;
> >  import org.apache.samza.sql.api.operators.OperatorCallback;
> >  import org.apache.samza.sql.data.IncomingMessageTuple;
> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> > @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask;
> >   *
> >   */
> >  public class RandomWindowOperatorTask implements StreamTask,
> > InitableTask, WindowableTask {
> > -  private BoundedTimeWindow wndOp;
> > +  private Operator operator;
> >
> >    private final OperatorCallback wndCallback = new OperatorCallback() {
> >
> > @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements
> > StreamTask, InitableTask, Windo
> >    public void process(IncomingMessageEnvelope envelope, MessageCollector
> > collector, TaskCoordinator coordinator)
> >        throws Exception {
> >      // based on tuple's stream name, get the window op and run process()
> > -    wndOp.process(new IncomingMessageTuple(envelope), collector,
> > coordinator);
> > +    operator.process(new IncomingMessageTuple(envelope), collector,
> > coordinator);
> >
> >    }
> >
> >    @Override
> >    public void window(MessageCollector collector, TaskCoordinator
> > coordinator) throws Exception {
> >      // based on tuple's stream name, get the window op and run process()
> > -    wndOp.refresh(System.nanoTime(), collector, coordinator);
> > +    operator.refresh(System.nanoTime(), collector, coordinator);
> >    }
> >
> >    @Override
> >    public void init(Config config, TaskContext context) throws Exception
> {
> >      // 1. create a fixed length 10 sec window operator
> > -    this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1",
> > "relation1", this.wndCallback);
> > -    this.wndOp.init(config, context);
> > +    this.operator = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1",
> > "wndOutput", this.wndCallback);
> > +    this.operator.init(config, context);
> >    }
> >  }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> > ----------------------------------------------------------------------
> > diff --git
> >
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> >
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> > index 9124e3c..d65892c 100644
> > ---
> >
> a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> > +++
> >
> b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java
> > @@ -24,7 +24,7 @@ import java.util.List;
> >
> >  import org.apache.samza.config.Config;
> >  import org.apache.samza.sql.data.IncomingMessageTuple;
> > -import org.apache.samza.sql.operators.factory.SimpleRouter;
> > +import org.apache.samza.sql.operators.SimpleRouter;
> >  import org.apache.samza.sql.operators.join.StreamStreamJoin;
> >  import org.apache.samza.sql.operators.partition.PartitionOp;
> >  import org.apache.samza.sql.operators.window.BoundedTimeWindow;
> > @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask;
> >   */
> >  public class StreamSqlTask implements StreamTask, InitableTask,
> > WindowableTask {
> >
> > -  private SimpleRouter rteCntx;
> > +  private SimpleRouter router;
> >
> >    @Override
> >    public void process(IncomingMessageEnvelope envelope, MessageCollector
> > collector, TaskCoordinator coordinator)
> >        throws Exception {
> > -    this.rteCntx.process(new IncomingMessageTuple(envelope), collector,
> > coordinator);
> > +    this.router.process(new IncomingMessageTuple(envelope), collector,
> > coordinator);
> >    }
> >
> >    @Override
> >    public void window(MessageCollector collector, TaskCoordinator
> > coordinator) throws Exception {
> > -    this.rteCntx.refresh(System.nanoTime(), collector, coordinator);
> > +    this.router.refresh(System.nanoTime(), collector, coordinator);
> >    }
> >
> >    @Override
> >    public void init(Config config, TaskContext context) throws Exception
> {
> >      // create all operators via the operator factory
> >      // 1. create two window operators
> > -    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
> > "inputStream1", "fixedWndOutput1");
> > -    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
> > "inputStream2", "fixedWndOutput2");
> > +    BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10,
> > "kafka:inputStream1", "fixedWndOutput1");
> > +    BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10,
> > "kafka:inputStream2", "fixedWndOutput2");
> >      // 2. create one join operator
> >      @SuppressWarnings("serial")
> >      List<String> inputRelations = new ArrayList<String>() {
> > @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask,
> > InitableTask, WindowableTask {
> >        }
> >      };
> >      StreamStreamJoin join = new StreamStreamJoin("joinOp",
> > inputRelations, "joinOutput", joinKeys);
> > -    // 4. create a re-partition operator
> > +    // 3. create a re-partition operator
> >      PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka",
> > "parOutputStrm1", "joinKey", 50);
> >
> >      // Now, connecting the operators via the OperatorRouter
> > -    this.rteCntx = new SimpleRouter();
> > +    this.router = new SimpleRouter();
> >      // 1. set two system input operators (i.e. two window operators)
> > -    this.rteCntx.addOperator(wnd1);
> > -    this.rteCntx.addOperator(wnd2);
> > +    this.router.addOperator(wnd1);
> > +    this.router.addOperator(wnd2);
> >      // 2. connect join operator to both window operators
> > -    this.rteCntx.addOperator(join);
> > +    this.router.addOperator(join);
> >      // 3. connect re-partition operator to the stream operator
> > -    this.rteCntx.addOperator(par);
> > +    this.router.addOperator(par);
> >
> > -    this.rteCntx.init(config, context);
> > +    this.router.init(config, context);
> >    }
> >  }
> >
> >
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Reply via email to