Hi, Milinda, That was an accidental mistake. I have reverted the check-in. I am still working on that. Thanks!
-Yi On Mon, Jun 1, 2015 at 9:34 PM, Milinda Pathirage <mpath...@umail.iu.edu> wrote: > Hi Navina, > > Did we decided to push this patch to samza-sql branch. I thought Yi is > still working on this. Some Git conflict related texts are still there in > this commit. > > +<<<<<<< HEAD > + * The callback object > +======= > + * The callback function > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of > callbacks w/o inheriting and creating many sub-classes from operators > > Milinda > > On Mon, Jun 1, 2015 at 9:06 PM, <nav...@apache.org> wrote: > > > Yi's TopologyBuilder RB 34500 > > > > > > Project: http://git-wip-us.apache.org/repos/asf/samza/repo > > Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/45b85477 > > Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/45b85477 > > Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/45b85477 > > > > Branch: refs/heads/samza-sql > > Commit: 45b854772cf36cc69e8d8cda7a51bce1be5fe576 > > Parents: 41c4cd0 > > Author: Navina <navi.trin...@gmail.com> > > Authored: Thu May 28 18:51:30 2015 -0700 > > Committer: Navina <navi.trin...@gmail.com> > > Committed: Thu May 28 18:51:30 2015 -0700 > > > > ---------------------------------------------------------------------- > > .../apache/samza/sql/api/data/EntityName.java | 41 ++- > > .../org/apache/samza/sql/api/data/Table.java | 7 +- > > .../samza/sql/api/operators/Operator.java | 4 + > > .../sql/api/operators/OperatorCallback.java | 1 - > > .../samza/sql/api/operators/OperatorRouter.java | 8 + > > .../samza/sql/api/operators/OperatorSink.java | 30 ++ > > .../samza/sql/api/operators/OperatorSource.java | 30 ++ > > .../samza/sql/api/operators/SimpleOperator.java | 3 +- > > .../samza/sql/data/IncomingMessageTuple.java | 1 - > > .../sql/operators/NoopOperatorCallback.java | 53 ++++ > > .../samza/sql/operators/OperatorTopology.java | 53 ++++ > > .../samza/sql/operators/SimpleOperatorImpl.java | 147 ++++++++++ > > .../samza/sql/operators/SimpleOperatorSpec.java | 106 +++++++ > > .../samza/sql/operators/SimpleRouter.java | 141 +++++++++ > > .../operators/factory/NoopOperatorCallback.java | 50 ---- > > .../operators/factory/SimpleOperatorImpl.java | 136 --------- > > .../operators/factory/SimpleOperatorSpec.java | 106 ------- > > .../sql/operators/factory/SimpleRouter.java | 136 --------- > > .../sql/operators/factory/TopologyBuilder.java | 284 > +++++++++++++++++++ > > .../sql/operators/join/StreamStreamJoin.java | 3 +- > > .../operators/join/StreamStreamJoinSpec.java | 15 +- > > .../sql/operators/partition/PartitionOp.java | 3 +- > > .../sql/operators/partition/PartitionSpec.java | 2 +- > > .../sql/operators/window/BoundedTimeWindow.java | 4 +- > > .../samza/sql/operators/window/WindowSpec.java | 7 +- > > .../samza/task/sql/SimpleMessageCollector.java | 37 ++- > > .../task/sql/RandomWindowOperatorTask.java | 11 +- > > .../apache/samza/task/sql/StreamSqlTask.java | 26 +- > > .../samza/task/sql/UserCallbacksSqlTask.java | 66 ++--- > > 29 files changed, 991 insertions(+), 520 deletions(-) > > ---------------------------------------------------------------------- > > > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > > index 80ba455..df1b11b 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > > @@ -49,6 +49,8 @@ public class EntityName { > > */ > > private final String name; > > > > + private final boolean isSystemEntity; > > + > > /** > > * Static map of already allocated table names > > */ > > @@ -59,15 +61,19 @@ public class EntityName { > > */ > > private static Map<String, EntityName> streams = new HashMap<String, > > EntityName>(); > > > > + private static final String ANONYMOUS = "anonymous"; > > + > > /** > > * Private ctor to create entity names > > * > > * @param type Type of the entity name > > * @param name Formatted name of the entity > > + * @param isSystemEntity whether the entity is a system input/output > > */ > > - private EntityName(EntityType type, String name) { > > + private EntityName(EntityType type, String name, boolean > > isSystemEntity) { > > this.type = type; > > this.name = name; > > + this.isSystemEntity = isSystemEntity; > > } > > > > @Override > > @@ -102,6 +108,10 @@ public class EntityName { > > return this.type.equals(EntityType.STREAM); > > } > > > > + public boolean isSystemEntity() { > > + return this.isSystemEntity; > > + } > > + > > /** > > * Get the formatted entity name > > * > > @@ -111,15 +121,24 @@ public class EntityName { > > return this.name; > > } > > > > + public static EntityName getTableName(String name) { > > + return getTableName(name, false); > > + } > > + > > + public static EntityName getStreamName(String name) { > > + return getStreamName(name, false); > > + } > > + > > /** > > * Static method to get the instance of {@code EntityName} with type > > {@code EntityType.TABLE} > > * > > * @param name The formatted entity name of the relation > > + * @param isSystem The boolean flag indicating whether this is a > system > > input/output > > * @return A <code>EntityName</code> for a relation > > */ > > - public static EntityName getTableName(String name) { > > + public static EntityName getTableName(String name, boolean isSystem) { > > if (tables.get(name) == null) { > > - tables.put(name, new EntityName(EntityType.TABLE, name)); > > + tables.put(name, new EntityName(EntityType.TABLE, name, > isSystem)); > > } > > return tables.get(name); > > } > > @@ -128,13 +147,25 @@ public class EntityName { > > * Static method to get the instance of <code>EntityName</code> with > > type <code>EntityType.STREAM</code> > > * > > * @param name The formatted entity name of the stream > > + * @param isSystem The boolean flag indicating whether this is a > system > > input/output > > * @return A <code>EntityName</code> for a stream > > */ > > - public static EntityName getStreamName(String name) { > > + public static EntityName getStreamName(String name, boolean isSystem) > { > > if (streams.get(name) == null) { > > - streams.put(name, new EntityName(EntityType.STREAM, name)); > > + streams.put(name, new EntityName(EntityType.STREAM, name, > > isSystem)); > > } > > return streams.get(name); > > } > > > > + public static EntityName getAnonymousStream() { > > + return getStreamName(ANONYMOUS); > > + } > > + > > + public static EntityName getAnonymousTable() { > > + return getTableName(ANONYMOUS); > > + } > > + > > + public boolean isAnonymous() { > > + return this.name.equals(ANONYMOUS); > > + } > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > > ---------------------------------------------------------------------- > > diff --git > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > > index 7b4d984..b4dce07 100644 > > --- > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > > +++ > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > > @@ -19,6 +19,9 @@ > > > > package org.apache.samza.sql.api.data; > > > > +import java.util.List; > > + > > + > > /** > > * This interface defines a non-ordered {@link > > org.apache.samza.sql.api.data.Relation}, which has a unique primary key > > * > > @@ -31,8 +34,8 @@ public interface Table<K> extends Relation<K> { > > /** > > * Get the primary key field name for this table > > * > > - * @return The name of the primary key field > > + * @return The names of the primary key fields > > */ > > - String getPrimaryKeyName(); > > + List<String> getPrimaryKeyNames(); > > > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > > index d6f6b57..9c6eaa5 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > > @@ -27,7 +27,11 @@ import org.apache.samza.task.TaskContext; > > import org.apache.samza.task.TaskCoordinator; > > > > > > +/** > > + * This class defines the common interface for operator classes. > > + */ > > public interface Operator { > > + > > /** > > * Method to initialize the operator > > * > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > > index fb2aa89..5a77d95 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > > @@ -23,7 +23,6 @@ import org.apache.samza.sql.api.data.Tuple; > > import org.apache.samza.task.MessageCollector; > > import org.apache.samza.task.TaskCoordinator; > > > > - > > /** > > * Defines the callback functions to allow customized functions to be > > invoked before process and before sending the result > > */ > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > > index 0759638..432e6b3 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > > @@ -19,6 +19,7 @@ > > > > package org.apache.samza.sql.api.operators; > > > > +import java.util.Iterator; > > import java.util.List; > > > > import org.apache.samza.sql.api.data.EntityName; > > @@ -51,4 +52,11 @@ public interface OperatorRouter extends Operator { > > */ > > List<SimpleOperator> getNextOperators(EntityName output); > > > > + /** > > + * This method provides an iterator to go through all operators > > connected via {@code OperatorRouter} > > + * > > + * @return An {@link java.util.Iterator} for all operators connected > > via {@code OperatorRouter} > > + */ > > + Iterator<SimpleOperator> iterator(); > > + > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > > new file mode 100644 > > index 0000000..e2c748c > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > > @@ -0,0 +1,30 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > +package org.apache.samza.sql.api.operators; > > + > > +import java.util.Iterator; > > + > > +import org.apache.samza.sql.api.data.EntityName; > > + > > + > > +public interface OperatorSink { > > + Iterator<SimpleOperator> opIterator(); > > + > > + EntityName getName(); > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > > new file mode 100644 > > index 0000000..860c1aa > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > > @@ -0,0 +1,30 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > +package org.apache.samza.sql.api.operators; > > + > > +import java.util.Iterator; > > + > > +import org.apache.samza.sql.api.data.EntityName; > > + > > + > > +public interface OperatorSource { > > + Iterator<SimpleOperator> opIterator(); > > + > > + EntityName getName(); > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > > index c49a822..60ace9c 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > > @@ -19,8 +19,6 @@ > > > > package org.apache.samza.sql.api.operators; > > > > - > > - > > /** > > * The interface for a {@code SimpleOperator} that implements a simple > > primitive relational logic operation > > */ > > @@ -31,4 +29,5 @@ public interface SimpleOperator extends Operator { > > * @return The {@link org.apache.samza.sql.api.operators.OperatorSpec} > > object that defines the configuration/parameters of the operator > > */ > > OperatorSpec getSpec(); > > + > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > > index 72a59f2..af040f0 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > > @@ -81,7 +81,6 @@ public class IncomingMessageTuple implements Tuple { > > > > @Override > > public long getCreateTimeNano() { > > - // TODO: this is wrong and just to keep as an placeholder. It should > > be replaced by the message publish time when the publish timestamp is > > available in the message metadata > > return this.recvTimeNano; > > } > > > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > > new file mode 100644 > > index 0000000..e951737 > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > > @@ -0,0 +1,53 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > +package org.apache.samza.sql.operators; > > + > > +import org.apache.samza.sql.api.data.Relation; > > +import org.apache.samza.sql.api.data.Tuple; > > +import org.apache.samza.sql.api.operators.OperatorCallback; > > +import org.apache.samza.task.MessageCollector; > > +import org.apache.samza.task.TaskCoordinator; > > + > > + > > +/** > > + * This is a default NOOP operator callback object that does nothing > > before and after the process method > > + */ > > +public final class NoopOperatorCallback implements OperatorCallback { > > + > > + @Override > > + public Tuple beforeProcess(Tuple tuple, MessageCollector collector, > > TaskCoordinator coordinator) { > > + return tuple; > > + } > > + > > + @Override > > + public Relation beforeProcess(Relation rel, MessageCollector > collector, > > TaskCoordinator coordinator) { > > + return rel; > > + } > > + > > + @Override > > + public Tuple afterProcess(Tuple tuple, MessageCollector collector, > > TaskCoordinator coordinator) { > > + return tuple; > > + } > > + > > + @Override > > + public Relation afterProcess(Relation rel, MessageCollector collector, > > TaskCoordinator coordinator) { > > + return rel; > > + } > > + > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > > new file mode 100644 > > index 0000000..8b70092 > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > > @@ -0,0 +1,53 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > +package org.apache.samza.sql.operators; > > + > > +import java.util.Iterator; > > + > > +import org.apache.samza.sql.api.data.EntityName; > > +import org.apache.samza.sql.api.operators.OperatorSink; > > +import org.apache.samza.sql.api.operators.OperatorSource; > > +import org.apache.samza.sql.api.operators.SimpleOperator; > > + > > + > > +/** > > + * This class implements a partially completed {@link > > org.apache.samza.sql.operators.factory.TopologyBuilder} that signifies a > > partially completed > > + * topology that the current operator has unbounded input stream that > can > > be attached to other operators' output > > + */ > > +public class OperatorTopology implements OperatorSource, OperatorSink { > > + > > + private final EntityName name; > > + private final SimpleRouter router; > > + > > + public OperatorTopology(EntityName name, SimpleRouter router) { > > + this.name = name; > > + this.router = router; > > + } > > + > > + @Override > > + public Iterator<SimpleOperator> opIterator() { > > + return this.router.iterator(); > > + } > > + > > + @Override > > + public EntityName getName() { > > + return this.name; > > + } > > + > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > > new file mode 100644 > > index 0000000..423880b > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > > @@ -0,0 +1,147 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > + > > +package org.apache.samza.sql.operators; > > + > > +import org.apache.samza.sql.api.data.Relation; > > +import org.apache.samza.sql.api.data.Tuple; > > +import org.apache.samza.sql.api.operators.OperatorCallback; > > +import org.apache.samza.sql.api.operators.OperatorSpec; > > +import org.apache.samza.sql.api.operators.SimpleOperator; > > +import org.apache.samza.task.MessageCollector; > > +import org.apache.samza.task.TaskCoordinator; > > +import org.apache.samza.task.sql.SimpleMessageCollector; > > + > > + > > +/** > > + * An abstract class that encapsulate the basic information and methods > > that all operator classes should implement. > > + * It implements the interface {@link > > org.apache.samza.sql.api.operators.SimpleOperator} > > + */ > > +public abstract class SimpleOperatorImpl implements SimpleOperator { > > + /** > > + * The specification of this operator > > + */ > > + private final OperatorSpec spec; > > + > > + /** > > +<<<<<<< HEAD > > + * The callback object > > +======= > > + * The callback function > > +>>>>>>> SAMZA-552: use OperatorCallback to allow implementation of > > callbacks w/o inheriting and creating many sub-classes from operators > > + */ > > + private final OperatorCallback callback; > > + > > + /** > > + * Ctor of {@code SimpleOperatorImpl} class > > + * > > + * @param spec The specification of this operator > > + */ > > + public SimpleOperatorImpl(OperatorSpec spec) { > > + this(spec, new NoopOperatorCallback()); > > + } > > + > > + public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback > callback) > > { > > + this.spec = spec; > > + this.callback = callback; > > + } > > + > > + @Override > > + public OperatorSpec getSpec() { > > + return this.spec; > > + } > > + > > + /** > > + * This method is made final s.t. the sequence of invocations between > > {@link > > > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, > > MessageCollector, TaskCoordinator)} > > + * and real processing of the input relation is fixed. > > + */ > > + @Override > > + final public void process(Relation deltaRelation, MessageCollector > > collector, TaskCoordinator coordinator) > > + throws Exception { > > + Relation rel = this.callback.beforeProcess(deltaRelation, collector, > > coordinator); > > + if (rel == null) { > > + return; > > + } > > + this.realProcess(rel, getCollector(collector, coordinator), > > coordinator); > > + } > > + > > + /** > > + * This method is made final s.t. the sequence of invocations between > > {@link > > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, > > MessageCollector, TaskCoordinator)} > > + * and real processing of the input tuple is fixed. > > + */ > > + @Override > > + final public void process(Tuple tuple, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > + Tuple ituple = this.callback.beforeProcess(tuple, collector, > > coordinator); > > + if (ituple == null) { > > + return; > > + } > > + this.realProcess(ituple, getCollector(collector, coordinator), > > coordinator); > > + } > > + > > + /** > > + * This method is made final s.t. we enforce the invocation of {@code > > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} > before > > doing anything futher > > + */ > > + @Override > > + final public void refresh(long timeNano, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > + this.realRefresh(timeNano, getCollector(collector, coordinator), > > coordinator); > > + } > > + > > + private SimpleMessageCollector getCollector(MessageCollector > collector, > > TaskCoordinator coordinator) { > > + if (!(collector instanceof SimpleMessageCollector)) { > > + return new SimpleMessageCollector(collector, coordinator, > > this.callback); > > + } else { > > + ((SimpleMessageCollector) > collector).switchCallback(this.callback); > > + return (SimpleMessageCollector) collector; > > + } > > + } > > + > > + /** > > + * Method to be overriden by each specific implementation class of > > operator to handle timeout event > > + * > > + * @param timeNano The time in nanosecond when the timeout event > > occurred > > + * @param collector The {@link > > org.apache.samza.task.sql.SimpleMessageCollector} in the context > > + * @param coordinator The {@link > org.apache.samza.task.TaskCoordinator} > > in the context > > + * @throws Exception Throws exception if failed to refresh the results > > + */ > > + protected abstract void realRefresh(long timeNano, > > SimpleMessageCollector collector, TaskCoordinator coordinator) > > + throws Exception; > > + > > + /** > > + * Method to be overriden by each specific implementation class of > > operator to perform relational logic operation on an input {@link > > org.apache.samza.sql.api.data.Relation} > > + * > > + * @param rel The input relation > > + * @param collector The {@link > > org.apache.samza.task.sql.SimpleMessageCollector} in the context > > + * @param coordinator The {@link > org.apache.samza.task.TaskCoordinator} > > in the context > > + * @throws Exception Throws exception if failed to process > > + */ > > + protected abstract void realProcess(Relation rel, > > SimpleMessageCollector collector, TaskCoordinator coordinator) > > + throws Exception; > > + > > + /** > > + * Method to be overriden by each specific implementation class of > > operator to perform relational logic operation on an input {@link > > org.apache.samza.sql.api.data.Tuple} > > + * > > + * @param ituple The input tuple > > + * @param collector The {@link > > org.apache.samza.task.sql.SimpleMessageCollector} in the context > > + * @param coordinator The {@link > org.apache.samza.task.TaskCoordinator} > > in the context > > + * @throws Exception Throws exception if failed to process > > + */ > > + protected abstract void realProcess(Tuple ituple, > > SimpleMessageCollector collector, TaskCoordinator coordinator) > > + throws Exception; > > + > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > > new file mode 100644 > > index 0000000..691e543 > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > > @@ -0,0 +1,106 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > +package org.apache.samza.sql.operators; > > + > > +import java.util.ArrayList; > > +import java.util.List; > > + > > +import org.apache.samza.sql.api.data.EntityName; > > +import org.apache.samza.sql.api.operators.OperatorSpec; > > + > > + > > +/** > > + * An abstract class that encapsulate the basic information and methods > > that all specification of operators should implement. > > + * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec} > > + */ > > +public abstract class SimpleOperatorSpec implements OperatorSpec { > > + /** > > + * The identifier of the corresponding operator > > + */ > > + private final String id; > > + > > + /** > > + * The list of input entity names of the corresponding operator > > + */ > > + private final List<EntityName> inputs = new ArrayList<EntityName>(); > > + > > + /** > > + * The list of output entity names of the corresponding operator > > + */ > > + private final List<EntityName> outputs = new ArrayList<EntityName>(); > > + > > + /** > > + * Ctor of the {@code SimpleOperatorSpec} for simple {@link > > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one > > output > > + * > > + * @param id Unique identifier of the {@link > > org.apache.samza.sql.api.operators.SimpleOperator} object > > + * @param input The only input entity > > + * @param output The only output entity > > + */ > > + public SimpleOperatorSpec(String id, EntityName input, EntityName > > output) { > > + this.id = id; > > + this.inputs.add(input); > > + this.outputs.add(output); > > + } > > + > > + /** > > + * Ctor of {@code SimpleOperatorSpec} with general format: m inputs > and > > n outputs > > + * > > + * @param id Unique identifier of the {@link > > org.apache.samza.sql.api.operators.SimpleOperator} object > > + * @param inputs The list of input entities > > + * @param output The list of output entities > > + */ > > + public SimpleOperatorSpec(String id, List<EntityName> inputs, > > EntityName output) { > > + this.id = id; > > + this.inputs.addAll(inputs); > > + this.outputs.add(output); > > + } > > + > > + @Override > > + public String getId() { > > + return this.id; > > + } > > + > > + @Override > > + public List<EntityName> getInputNames() { > > + return this.inputs; > > + } > > + > > + @Override > > + public List<EntityName> getOutputNames() { > > + return this.outputs; > > + } > > + > > + /** > > + * Method to get the first output entity > > + * > > + * @return The first output entity name > > + */ > > + public EntityName getOutputName() { > > + return this.outputs.get(0); > > + } > > + > > + /** > > + * Method to get the first input entity > > + * > > + * @return The first input entity name > > + */ > > + public EntityName getInputName() { > > + return this.inputs.get(0); > > + } > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > > new file mode 100644 > > index 0000000..2d9a1db > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > > @@ -0,0 +1,141 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > + > > +package org.apache.samza.sql.operators; > > + > > +import java.util.ArrayList; > > +import java.util.HashMap; > > +import java.util.HashSet; > > +import java.util.Iterator; > > +import java.util.List; > > +import java.util.Map; > > +import java.util.Set; > > + > > +import org.apache.samza.config.Config; > > +import org.apache.samza.sql.api.data.EntityName; > > +import org.apache.samza.sql.api.data.Relation; > > +import org.apache.samza.sql.api.data.Tuple; > > +import org.apache.samza.sql.api.operators.Operator; > > +import org.apache.samza.sql.api.operators.OperatorRouter; > > +import org.apache.samza.sql.api.operators.SimpleOperator; > > +import org.apache.samza.task.MessageCollector; > > +import org.apache.samza.task.TaskContext; > > +import org.apache.samza.task.TaskCoordinator; > > +import org.apache.samza.task.sql.RouterMessageCollector; > > + > > + > > +/** > > + * Example implementation of {@link > > org.apache.samza.sql.api.operators.OperatorRouter} > > + * > > + */ > > +public final class SimpleRouter implements OperatorRouter { > > + /** > > + * List of operators added to the {@link > > org.apache.samza.sql.api.operators.OperatorRouter} > > + */ > > + private List<SimpleOperator> operators = new > > ArrayList<SimpleOperator>(); > > + > > + @SuppressWarnings("rawtypes") > > + /** > > + * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list > > of operators associated with it > > + */ > > + private Map<EntityName, List> nextOps = new HashMap<EntityName, > List>(); > > + > > + /** > > + * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs > to > > this {@code SimpleRouter} > > + */ > > + private Set<EntityName> inputEntities = new HashSet<EntityName>(); > > + > > + /** > > + * Set of entities that are not input entities to this {@code > > SimpleRouter} > > + */ > > + private Set<EntityName> outputEntities = new HashSet<EntityName>(); > > + > > + @SuppressWarnings("unchecked") > > + private void addOperator(EntityName input, SimpleOperator nextOp) { > > + if (nextOps.get(input) == null) { > > + nextOps.put(input, new ArrayList<Operator>()); > > + } > > + nextOps.get(input).add(nextOp); > > + operators.add(nextOp); > > + // get the operator spec > > + for (EntityName output : nextOp.getSpec().getOutputNames()) { > > + if (inputEntities.contains(output)) { > > + inputEntities.remove(output); > > + } > > + outputEntities.add(output); > > + } > > + if (!outputEntities.contains(input)) { > > + inputEntities.add(input); > > + } > > + } > > + > > + @Override > > + @SuppressWarnings("unchecked") > > + public List<SimpleOperator> getNextOperators(EntityName entity) { > > + return nextOps.get(entity); > > + } > > + > > + @Override > > + public void addOperator(SimpleOperator nextOp) { > > + List<EntityName> inputs = nextOp.getSpec().getInputNames(); > > + for (EntityName input : inputs) { > > + addOperator(input, nextOp); > > + } > > + } > > + > > + @Override > > + public void init(Config config, TaskContext context) throws Exception > { > > + for (SimpleOperator op : this.operators) { > > + op.init(config, context); > > + } > > + } > > + > > + @Override > > + public void process(Tuple ituple, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > + MessageCollector opCollector = new RouterMessageCollector(collector, > > coordinator, this); > > + for (Iterator<SimpleOperator> iter = > > this.getNextOperators(ituple.getEntityName()).iterator(); > iter.hasNext();) { > > + iter.next().process(ituple, opCollector, coordinator); > > + } > > + } > > + > > + @SuppressWarnings("rawtypes") > > + @Override > > + public void process(Relation deltaRelation, MessageCollector > collector, > > TaskCoordinator coordinator) throws Exception { > > + MessageCollector opCollector = new RouterMessageCollector(collector, > > coordinator, this); > > + for (Iterator<SimpleOperator> iter = > > this.getNextOperators(deltaRelation.getName()).iterator(); > iter.hasNext();) > > { > > + iter.next().process(deltaRelation, opCollector, coordinator); > > + } > > + } > > + > > + @Override > > + public void refresh(long nanoSec, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > + MessageCollector opCollector = new RouterMessageCollector(collector, > > coordinator, this); > > + for (EntityName entity : inputEntities) { > > + for (Iterator<SimpleOperator> iter = > > this.getNextOperators(entity).iterator(); iter.hasNext();) { > > + iter.next().refresh(nanoSec, opCollector, coordinator); > > + } > > + } > > + } > > + > > + @Override > > + public Iterator<SimpleOperator> iterator() { > > + return this.operators.iterator(); > > + } > > + > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > > deleted file mode 100644 > > index c3d2266..0000000 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > > +++ /dev/null > > @@ -1,50 +0,0 @@ > > -/* > > - * Licensed to the Apache Software Foundation (ASF) under one > > - * or more contributor license agreements. See the NOTICE file > > - * distributed with this work for additional information > > - * regarding copyright ownership. The ASF licenses this file > > - * to you under the Apache License, Version 2.0 (the > > - * "License"); you may not use this file except in compliance > > - * with the License. You may obtain a copy of the License at > > - * > > - * http://www.apache.org/licenses/LICENSE-2.0 > > - * > > - * Unless required by applicable law or agreed to in writing, > > - * software distributed under the License is distributed on an > > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > - * KIND, either express or implied. See the License for the > > - * specific language governing permissions and limitations > > - * under the License. > > - */ > > -package org.apache.samza.sql.operators.factory; > > - > > -import org.apache.samza.sql.api.data.Relation; > > -import org.apache.samza.sql.api.data.Tuple; > > -import org.apache.samza.sql.api.operators.OperatorCallback; > > -import org.apache.samza.task.MessageCollector; > > -import org.apache.samza.task.TaskCoordinator; > > - > > - > > -public final class NoopOperatorCallback implements OperatorCallback { > > - > > - @Override > > - public Tuple beforeProcess(Tuple tuple, MessageCollector collector, > > TaskCoordinator coordinator) { > > - return tuple; > > - } > > - > > - @Override > > - public Relation beforeProcess(Relation rel, MessageCollector > collector, > > TaskCoordinator coordinator) { > > - return rel; > > - } > > - > > - @Override > > - public Tuple afterProcess(Tuple tuple, MessageCollector collector, > > TaskCoordinator coordinator) { > > - return tuple; > > - } > > - > > - @Override > > - public Relation afterProcess(Relation rel, MessageCollector collector, > > TaskCoordinator coordinator) { > > - return rel; > > - } > > - > > -} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > > deleted file mode 100644 > > index e66451f..0000000 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > > +++ /dev/null > > @@ -1,136 +0,0 @@ > > -/* > > - * Licensed to the Apache Software Foundation (ASF) under one > > - * or more contributor license agreements. See the NOTICE file > > - * distributed with this work for additional information > > - * regarding copyright ownership. The ASF licenses this file > > - * to you under the Apache License, Version 2.0 (the > > - * "License"); you may not use this file except in compliance > > - * with the License. You may obtain a copy of the License at > > - * > > - * http://www.apache.org/licenses/LICENSE-2.0 > > - * > > - * Unless required by applicable law or agreed to in writing, > > - * software distributed under the License is distributed on an > > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > - * KIND, either express or implied. See the License for the > > - * specific language governing permissions and limitations > > - * under the License. > > - */ > > - > > -package org.apache.samza.sql.operators.factory; > > - > > -import org.apache.samza.sql.api.data.Relation; > > -import org.apache.samza.sql.api.data.Tuple; > > -import org.apache.samza.sql.api.operators.OperatorCallback; > > -import org.apache.samza.sql.api.operators.OperatorSpec; > > -import org.apache.samza.sql.api.operators.SimpleOperator; > > -import org.apache.samza.task.MessageCollector; > > -import org.apache.samza.task.TaskCoordinator; > > -import org.apache.samza.task.sql.SimpleMessageCollector; > > - > > - > > -/** > > - * An abstract class that encapsulate the basic information and methods > > that all operator classes should implement. > > - * It implements the interface {@link > > org.apache.samza.sql.api.operators.SimpleOperator} > > - * > > - */ > > -public abstract class SimpleOperatorImpl implements SimpleOperator { > > - /** > > - * The specification of this operator > > - */ > > - private final OperatorSpec spec; > > - > > - /** > > - * The callback function > > - */ > > - private final OperatorCallback callback; > > - > > - /** > > - * Ctor of {@code SimpleOperatorImpl} class > > - * > > - * @param spec The specification of this operator > > - */ > > - public SimpleOperatorImpl(OperatorSpec spec) { > > - this(spec, new NoopOperatorCallback()); > > - } > > - > > - public SimpleOperatorImpl(OperatorSpec spec, OperatorCallback > callback) > > { > > - this.spec = spec; > > - this.callback = callback; > > - } > > - > > - @Override > > - public OperatorSpec getSpec() { > > - return this.spec; > > - } > > - > > - /** > > - * This method is made final s.t. the sequence of invocations between > > {@link > > > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Relation, > > MessageCollector, TaskCoordinator)} > > - * and real processing of the input relation is fixed. > > - */ > > - @Override > > - final public void process(Relation deltaRelation, MessageCollector > > collector, TaskCoordinator coordinator) > > - throws Exception { > > - Relation rel = this.callback.beforeProcess(deltaRelation, collector, > > coordinator); > > - if (rel == null) { > > - return; > > - } > > - this.realProcess(rel, getCollector(collector, coordinator), > > coordinator); > > - } > > - > > - /** > > - * This method is made final s.t. the sequence of invocations between > > {@link > > org.apache.samza.sql.api.operators.OperatorCallback#beforeProcess(Tuple, > > MessageCollector, TaskCoordinator)} > > - * and real processing of the input tuple is fixed. > > - */ > > - @Override > > - final public void process(Tuple tuple, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > - Tuple ituple = this.callback.beforeProcess(tuple, collector, > > coordinator); > > - if (ituple == null) { > > - return; > > - } > > - this.realProcess(ituple, getCollector(collector, coordinator), > > coordinator); > > - } > > - > > - /** > > - * This method is made final s.t. we enforce the invocation of {@code > > SimpleOperatorImpl#getCollector(MessageCollector, TaskCoordinator)} > before > > doing anything futher > > - */ > > - @Override > > - final public void refresh(long timeNano, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > - this.realRefresh(timeNano, getCollector(collector, coordinator), > > coordinator); > > - } > > - > > - private SimpleMessageCollector getCollector(MessageCollector > collector, > > TaskCoordinator coordinator) { > > - if (!(collector instanceof SimpleMessageCollector)) { > > - return new SimpleMessageCollector(collector, coordinator, > > this.callback); > > - } else { > > - ((SimpleMessageCollector) > > collector).switchOperatorCallback(this.callback); > > - return (SimpleMessageCollector) collector; > > - } > > - } > > - > > - /** > > - * Method to be overriden by each specific implementation class of > > operator to handle timeout event > > - * > > - * @param timeNano The time in nanosecond when the timeout event > > occurred > > - * @param collector The {@link > > org.apache.samza.task.sql.SimpleMessageCollector} in the context > > - * @param coordinator The {@link > org.apache.samza.task.TaskCoordinator} > > in the context > > - * @throws Exception Throws exception if failed to refresh the results > > - */ > > - protected abstract void realRefresh(long timeNano, > > SimpleMessageCollector collector, TaskCoordinator coordinator) > > - throws Exception; > > - > > - /** > > - * Method to be overriden by each specific implementation class of > > operator to perform relational logic operation on an input {@link > > org.apache.samza.sql.api.data.Relation} > > - * > > - * @param rel The input relation > > - * @param collector The {@link > > org.apache.samza.task.sql.SimpleMessageCollector} in the context > > - * @param coordinator The {@link > org.apache.samza.task.TaskCoordinator} > > in the context > > - * @throws Exception > > - */ > > - protected abstract void realProcess(Relation rel, > > SimpleMessageCollector collector, TaskCoordinator coordinator) > > - throws Exception; > > - > > - protected abstract void realProcess(Tuple ituple, > > SimpleMessageCollector collector, TaskCoordinator coordinator) > > - throws Exception; > > - > > -} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > > deleted file mode 100644 > > index 56753b6..0000000 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > > +++ /dev/null > > @@ -1,106 +0,0 @@ > > -/* > > - * Licensed to the Apache Software Foundation (ASF) under one > > - * or more contributor license agreements. See the NOTICE file > > - * distributed with this work for additional information > > - * regarding copyright ownership. The ASF licenses this file > > - * to you under the Apache License, Version 2.0 (the > > - * "License"); you may not use this file except in compliance > > - * with the License. You may obtain a copy of the License at > > - * > > - * http://www.apache.org/licenses/LICENSE-2.0 > > - * > > - * Unless required by applicable law or agreed to in writing, > > - * software distributed under the License is distributed on an > > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > - * KIND, either express or implied. See the License for the > > - * specific language governing permissions and limitations > > - * under the License. > > - */ > > -package org.apache.samza.sql.operators.factory; > > - > > -import java.util.ArrayList; > > -import java.util.List; > > - > > -import org.apache.samza.sql.api.data.EntityName; > > -import org.apache.samza.sql.api.operators.OperatorSpec; > > - > > - > > -/** > > - * An abstract class that encapsulate the basic information and methods > > that all specification of operators should implement. > > - * It implements {@link org.apache.samza.sql.api.operators.OperatorSpec} > > - */ > > -public abstract class SimpleOperatorSpec implements OperatorSpec { > > - /** > > - * The identifier of the corresponding operator > > - */ > > - private final String id; > > - > > - /** > > - * The list of input entity names of the corresponding operator > > - */ > > - private final List<EntityName> inputs = new ArrayList<EntityName>(); > > - > > - /** > > - * The list of output entity names of the corresponding operator > > - */ > > - private final List<EntityName> outputs = new ArrayList<EntityName>(); > > - > > - /** > > - * Ctor of the {@code SimpleOperatorSpec} for simple {@link > > org.apache.samza.sql.api.operators.SimpleOperator}s w/ one input and one > > output > > - * > > - * @param id Unique identifier of the {@link > > org.apache.samza.sql.api.operators.SimpleOperator} object > > - * @param input The only input entity > > - * @param output The only output entity > > - */ > > - public SimpleOperatorSpec(String id, EntityName input, EntityName > > output) { > > - this.id = id; > > - this.inputs.add(input); > > - this.outputs.add(output); > > - } > > - > > - /** > > - * Ctor of {@code SimpleOperatorSpec} with general format: m inputs > and > > n outputs > > - * > > - * @param id Unique identifier of the {@link > > org.apache.samza.sql.api.operators.SimpleOperator} object > > - * @param inputs The list of input entities > > - * @param output The list of output entities > > - */ > > - public SimpleOperatorSpec(String id, List<EntityName> inputs, > > EntityName output) { > > - this.id = id; > > - this.inputs.addAll(inputs); > > - this.outputs.add(output); > > - } > > - > > - @Override > > - public String getId() { > > - return this.id; > > - } > > - > > - @Override > > - public List<EntityName> getInputNames() { > > - return this.inputs; > > - } > > - > > - @Override > > - public List<EntityName> getOutputNames() { > > - return this.outputs; > > - } > > - > > - /** > > - * Method to get the first output entity > > - * > > - * @return The first output entity name > > - */ > > - public EntityName getOutputName() { > > - return this.outputs.get(0); > > - } > > - > > - /** > > - * Method to get the first input entity > > - * > > - * @return The first input entity name > > - */ > > - public EntityName getInputName() { > > - return this.inputs.get(0); > > - } > > -} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > > deleted file mode 100644 > > index e570897..0000000 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > > +++ /dev/null > > @@ -1,136 +0,0 @@ > > -/* > > - * Licensed to the Apache Software Foundation (ASF) under one > > - * or more contributor license agreements. See the NOTICE file > > - * distributed with this work for additional information > > - * regarding copyright ownership. The ASF licenses this file > > - * to you under the Apache License, Version 2.0 (the > > - * "License"); you may not use this file except in compliance > > - * with the License. You may obtain a copy of the License at > > - * > > - * http://www.apache.org/licenses/LICENSE-2.0 > > - * > > - * Unless required by applicable law or agreed to in writing, > > - * software distributed under the License is distributed on an > > - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > - * KIND, either express or implied. See the License for the > > - * specific language governing permissions and limitations > > - * under the License. > > - */ > > - > > -package org.apache.samza.sql.operators.factory; > > - > > -import java.util.ArrayList; > > -import java.util.HashMap; > > -import java.util.HashSet; > > -import java.util.Iterator; > > -import java.util.List; > > -import java.util.Map; > > -import java.util.Set; > > - > > -import org.apache.samza.config.Config; > > -import org.apache.samza.sql.api.data.EntityName; > > -import org.apache.samza.sql.api.data.Relation; > > -import org.apache.samza.sql.api.data.Tuple; > > -import org.apache.samza.sql.api.operators.Operator; > > -import org.apache.samza.sql.api.operators.OperatorRouter; > > -import org.apache.samza.sql.api.operators.SimpleOperator; > > -import org.apache.samza.task.MessageCollector; > > -import org.apache.samza.task.TaskContext; > > -import org.apache.samza.task.TaskCoordinator; > > -import org.apache.samza.task.sql.RouterMessageCollector; > > - > > - > > -/** > > - * Example implementation of {@link > > org.apache.samza.sql.api.operators.OperatorRouter} > > - * > > - */ > > -public final class SimpleRouter implements OperatorRouter { > > - /** > > - * List of operators added to the {@link > > org.apache.samza.sql.api.operators.OperatorRouter} > > - */ > > - private List<SimpleOperator> operators = new > > ArrayList<SimpleOperator>(); > > - > > - @SuppressWarnings("rawtypes") > > - /** > > - * Map of {@link org.apache.samza.sql.api.data.EntityName} to the list > > of operators associated with it > > - */ > > - private Map<EntityName, List> nextOps = new HashMap<EntityName, > List>(); > > - > > - /** > > - * Set of {@link org.apache.samza.sql.api.data.EntityName} as inputs > to > > this {@code SimpleRouter} > > - */ > > - private Set<EntityName> inputEntities = new HashSet<EntityName>(); > > - > > - /** > > - * Set of entities that are not input entities to this {@code > > SimpleRouter} > > - */ > > - private Set<EntityName> outputEntities = new HashSet<EntityName>(); > > - > > - @SuppressWarnings("unchecked") > > - private void addOperator(EntityName input, SimpleOperator nextOp) { > > - if (nextOps.get(input) == null) { > > - nextOps.put(input, new ArrayList<Operator>()); > > - } > > - nextOps.get(input).add(nextOp); > > - operators.add(nextOp); > > - // get the operator spec > > - for (EntityName output : nextOp.getSpec().getOutputNames()) { > > - if (inputEntities.contains(output)) { > > - inputEntities.remove(output); > > - } > > - outputEntities.add(output); > > - } > > - if (!outputEntities.contains(input)) { > > - inputEntities.add(input); > > - } > > - } > > - > > - @Override > > - @SuppressWarnings("unchecked") > > - public List<SimpleOperator> getNextOperators(EntityName entity) { > > - return nextOps.get(entity); > > - } > > - > > - @Override > > - public void addOperator(SimpleOperator nextOp) { > > - List<EntityName> inputs = nextOp.getSpec().getInputNames(); > > - for (EntityName input : inputs) { > > - addOperator(input, nextOp); > > - } > > - } > > - > > - @Override > > - public void init(Config config, TaskContext context) throws Exception > { > > - for (SimpleOperator op : this.operators) { > > - op.init(config, context); > > - } > > - } > > - > > - @Override > > - public void process(Tuple ituple, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > - MessageCollector opCollector = new RouterMessageCollector(collector, > > coordinator, this); > > - for (Iterator<SimpleOperator> iter = > > this.getNextOperators(ituple.getEntityName()).iterator(); > iter.hasNext();) { > > - iter.next().process(ituple, opCollector, coordinator); > > - } > > - } > > - > > - @SuppressWarnings("rawtypes") > > - @Override > > - public void process(Relation deltaRelation, MessageCollector > collector, > > TaskCoordinator coordinator) throws Exception { > > - MessageCollector opCollector = new RouterMessageCollector(collector, > > coordinator, this); > > - for (Iterator<SimpleOperator> iter = > > this.getNextOperators(deltaRelation.getName()).iterator(); > iter.hasNext();) > > { > > - iter.next().process(deltaRelation, opCollector, coordinator); > > - } > > - } > > - > > - @Override > > - public void refresh(long nanoSec, MessageCollector collector, > > TaskCoordinator coordinator) throws Exception { > > - MessageCollector opCollector = new RouterMessageCollector(collector, > > coordinator, this); > > - for (EntityName entity : inputEntities) { > > - for (Iterator<SimpleOperator> iter = > > this.getNextOperators(entity).iterator(); iter.hasNext();) { > > - iter.next().refresh(nanoSec, opCollector, coordinator); > > - } > > - } > > - } > > - > > -} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > > new file mode 100644 > > index 0000000..62b19fc > > --- /dev/null > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > > @@ -0,0 +1,284 @@ > > +/* > > + * Licensed to the Apache Software Foundation (ASF) under one > > + * or more contributor license agreements. See the NOTICE file > > + * distributed with this work for additional information > > + * regarding copyright ownership. The ASF licenses this file > > + * to you under the Apache License, Version 2.0 (the > > + * "License"); you may not use this file except in compliance > > + * with the License. You may obtain a copy of the License at > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, > > + * software distributed under the License is distributed on an > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > + * KIND, either express or implied. See the License for the > > + * specific language governing permissions and limitations > > + * under the License. > > + */ > > +package org.apache.samza.sql.operators.factory; > > + > > +import java.util.HashMap; > > +import java.util.HashSet; > > +import java.util.Iterator; > > +import java.util.List; > > +import java.util.Map; > > +import java.util.Set; > > + > > +import org.apache.samza.sql.api.data.EntityName; > > +import org.apache.samza.sql.api.operators.OperatorRouter; > > +import org.apache.samza.sql.api.operators.OperatorSink; > > +import org.apache.samza.sql.api.operators.OperatorSource; > > +import org.apache.samza.sql.api.operators.OperatorSpec; > > +import org.apache.samza.sql.api.operators.SimpleOperator; > > +import org.apache.samza.sql.api.operators.SqlOperatorFactory; > > +import org.apache.samza.sql.operators.OperatorTopology; > > +import org.apache.samza.sql.operators.SimpleRouter; > > + > > + > > +/** > > + * This class implements a builder to allow user to create the operators > > and connect them in a topology altogether. > > + */ > > +public class TopologyBuilder { > > + > > + /** > > + * Internal {@link org.apache.samza.sql.api.operators.OperatorRouter} > > object to retain the topology being created > > + */ > > + private SimpleRouter router; > > + > > + /** > > + * The {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} > > object used to create operators connected in the topology > > + */ > > + private final SqlOperatorFactory factory; > > + > > + /** > > + * The map of unbound inputs, the value is set(input_operators) > > + */ > > + private Map<EntityName, Set<OperatorSpec>> unboundInputs = new > > HashMap<EntityName, Set<OperatorSpec>>(); > > + > > + /** > > + * The map of unbound outputs, the value is the operator generating > the > > output > > + */ > > + private Map<EntityName, OperatorSpec> unboundOutputs = new > > HashMap<EntityName, OperatorSpec>(); > > + > > + /** > > + * The set of entities that are intermediate entities between > operators > > + */ > > + private Set<EntityName> interStreams = new HashSet<EntityName>(); > > + > > + /** > > + * The current operator that may have unbound input or output > > + */ > > + private SimpleOperator currentOp = null; > > + > > + /** > > + * Private constructor of {@code TopologyBuilder} > > + * > > + * @param factory The {@link > > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create > operators > > + */ > > + private TopologyBuilder(SqlOperatorFactory factory) { > > + this.router = new SimpleRouter(); > > + this.factory = factory; > > + } > > + > > + /** > > + * Static method to create this {@code TopologyBuilder} w/ a > customized > > {@link org.apache.samza.sql.api.operators.SqlOperatorFactory} > > + * > > + * @param factory The {@link > > org.apache.samza.sql.api.operators.SqlOperatorFactory} to create > operators > > + * @return The {@code TopologyBuilder} object > > + */ > > + public static TopologyBuilder create(SqlOperatorFactory factory) { > > + return new TopologyBuilder(factory); > > + } > > + > > + /** > > + * Static method to create this {@code TopologyBuilder} > > + * > > + * @return The {@code TopologyBuilder} object > > + */ > > + public static TopologyBuilder create() { > > + return new TopologyBuilder(new SimpleOperatorFactoryImpl()); > > + } > > + > > + /** > > + * Public method to create the next operator and attach it to the > > output of the current operator > > + * > > + * @param spec The {@link > > org.apache.samza.sql.api.operators.OperatorSpec} for the next operator > > + * @return The updated {@code TopologyBuilder} object > > + */ > > + public TopologyBuilder operator(OperatorSpec spec) { > > + // check whether it is valid to connect a new operator to the > current > > operator's output > > + SimpleOperator nextOp = this.factory.getOperator(spec); > > + return this.operator(nextOp); > > + } > > + > > + /** > > + * Public method to create the next operator and attach it to the > > output of the current operator > > + * > > + * @param op The {@link > > org.apache.samza.sql.api.operators.SimpleOperator} > > + * @return The updated {@code TopologyBuilder} object > > + */ > > + public TopologyBuilder operator(SimpleOperator op) { > > + // check whether it is valid to connect a new operator to the > current > > operator's output > > + canAddOperator(op); > > + this.addOperator(op); > > + // advance the current operator position > > + this.currentOp = op; > > + return this; > > + } > > + > > + /** > > + * Public method to create a stream object that will be the source to > > other operators > > + * > > + * @return The {@link > > org.apache.samza.sql.api.operators.OperatorSource} that can be the source > > to other operators > > + */ > > + public OperatorSource stream() { > > + canCreateSource(); > > + return new > > OperatorTopology(this.unboundOutputs.keySet().iterator().next(), > > this.router); > > + } > > + > > + /** > > + * Public method to create a sink object that can take input stream > > from other operators > > + * > > + * @return The {@link org.apache.samza.sql.api.operators.OperatorSink} > > that can be the downstream of other operators > > + */ > > + public OperatorSink sink() { > > + canCreateSink(); > > + return new > > OperatorTopology(this.unboundInputs.keySet().iterator().next(), > > this.router); > > + } > > + > > + /** > > + * Public method to bind the input of the current operator w/ the > > {@link org.apache.samza.sql.api.operators.OperatorSource} object > > + * > > + * @param srcStream The {@link > > org.apache.samza.sql.api.operators.OperatorSource} that the current > > operator is going to be bound to > > + * @return The updated {@code TopologyBuilder} object > > + */ > > + public TopologyBuilder bind(OperatorSource srcStream) { > > + EntityName streamName = srcStream.getName(); > > + if (this.unboundInputs.containsKey(streamName)) { > > + this.unboundInputs.remove(streamName); > > + this.interStreams.add(streamName); > > + } else { > > + // no input operator is waiting for the output from the srcStream > > + throw new IllegalArgumentException("No operator input can be bound > > to the input stream " + streamName); > > + } > > + // add all operators in srcStream to this topology > > + for (Iterator<SimpleOperator> iter = srcStream.opIterator(); > > iter.hasNext();) { > > + this.addOperator(iter.next()); > > + } > > + return this; > > + } > > + > > + /** > > + * Public method to attach a {@link > > org.apache.samza.sql.api.operators.OperatorSink} object to the output of > > the current operator > > + * > > + * @param nextSink The {@link > > org.apache.samza.sql.api.operators.OperatorSink} to be attached to the > > current operator's output > > + * @return The updated {@code TopologyBuilder} object > > + */ > > + public TopologyBuilder attach(OperatorSink nextSink) { > > + EntityName streamName = nextSink.getName(); > > + if (this.unboundOutputs.containsKey(streamName)) { > > + this.unboundOutputs.remove(streamName); > > + this.interStreams.add(streamName); > > + } else { > > + // no unbound output to attach to > > + throw new IllegalArgumentException("No operator output found to > > attach the sink " + streamName); > > + } > > + // add all operators in nextSink to the router > > + for (Iterator<SimpleOperator> iter = nextSink.opIterator(); > > iter.hasNext();) { > > + this.addOperator(iter.next()); > > + } > > + return this; > > + } > > + > > + /** > > + * Public method to finalize the topology that should have all input > > and output bound to system input and output > > + * > > + * @return The finalized {@link > > org.apache.samza.sql.api.operators.OperatorRouter} object > > + */ > > + public OperatorRouter build() { > > + canClose(); > > + return router; > > + } > > + > > + private TopologyBuilder addOperator(SimpleOperator nextOp) { > > + // if input is not in the unboundOutputs and interStreams, input is > > unbound > > + for (EntityName in : nextOp.getSpec().getInputNames()) { > > + if (this.unboundOutputs.containsKey(in)) { > > + this.unboundOutputs.remove(in); > > + this.interStreams.add(in); > > + } > > + if (!this.interStreams.contains(in) && !in.isSystemEntity()) { > > + if (!this.unboundInputs.containsKey(in)) { > > + this.unboundInputs.put(in, new HashSet<OperatorSpec>()); > > + } > > + this.unboundInputs.get(in).add(nextOp.getSpec()); > > + } > > + } > > + // if output is not in the unboundInputs and interStreams, output is > > unbound > > + for (EntityName out : nextOp.getSpec().getOutputNames()) { > > + if (this.unboundInputs.containsKey(out)) { > > + this.unboundInputs.remove(out); > > + this.interStreams.add(out); > > + } > > + if (!this.interStreams.contains(out) && !out.isSystemEntity()) { > > + this.unboundOutputs.put(out, nextOp.getSpec()); > > + } > > + } > > + try { > > + this.router.addOperator(nextOp); > > + } catch (Exception e) { > > + throw new RuntimeException("Failed to add operator " + > > nextOp.getSpec().getId() + " to the topology.", e); > > + } > > + return this; > > + } > > + > > + private void canCreateSource() { > > + if (this.unboundInputs.size() > 0) { > > + throw new IllegalStateException("Can't create stream when there > are > > unbounded input streams in the topology"); > > + } > > + if (this.unboundOutputs.size() != 1) { > > + throw new IllegalStateException( > > + "Can't create stream when the number of unbounded outputs is > > not 1 in the topology"); > > + } > > + } > > + > > + private void canCreateSink() { > > + if (this.unboundOutputs.size() > 0) { > > + throw new IllegalStateException("Can't create sink when there are > > unbounded output streams in the topology"); > > + } > > + if (this.unboundInputs.size() != 1) { > > + throw new IllegalStateException( > > + "Can't create sink when the number of unbounded input streams > > is not 1 in the topology"); > > + } > > + } > > + > > + private void canAddOperator(SimpleOperator op) { > > + if (this.currentOp == null) { > > + return; > > + } > > + for (EntityName name : this.currentOp.getSpec().getInputNames()) { > > + if (this.unboundInputs.containsKey(name)) { > > + throw new IllegalArgumentException("There are unbound input " + > > name + " to the current operator " > > + + this.currentOp.getSpec().getId() + ". Create a sink or > call > > bind instead"); > > + } > > + } > > + List<EntityName> nextInputs = op.getSpec().getInputNames(); > > + for (EntityName name : this.currentOp.getSpec().getOutputNames()) { > > + if (!nextInputs.contains(name) && > > this.unboundOutputs.containsKey(name)) { > > + // the current operator's output is not in the next operator's > > input list > > + throw new IllegalArgumentException("There are unbound output " + > > name + " from the current operator " > > + + this.currentOp.getSpec().getId() > > + + " that are not included in the next operator's inputs. > > Create a stream or call attach instead"); > > + } > > + } > > + } > > + > > + private void canClose() { > > + if (!this.unboundInputs.isEmpty() || > !this.unboundOutputs.isEmpty()) { > > + throw new IllegalStateException( > > + "There are input/output streams in the topology that are not > > bounded. Can't build the topology yet."); > > + } > > + } > > + > > +} > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > > index 2854aeb..7f5b990 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > > @@ -29,7 +29,7 @@ import org.apache.samza.sql.api.data.Relation; > > import org.apache.samza.sql.api.data.Stream; > > import org.apache.samza.sql.api.data.Tuple; > > import org.apache.samza.sql.api.operators.OperatorCallback; > > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > > import org.apache.samza.sql.window.storage.OrderedStoreKey; > > import org.apache.samza.storage.kv.Entry; > > @@ -38,7 +38,6 @@ import org.apache.samza.task.TaskContext; > > import org.apache.samza.task.TaskCoordinator; > > import org.apache.samza.task.sql.SimpleMessageCollector; > > > > - > > /** > > * This class implements a simple stream-to-stream join > > */ > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > > index cc0aca0..eecff7e 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > > @@ -19,10 +19,11 @@ > > > > package org.apache.samza.sql.operators.join; > > > > +import java.util.ArrayList; > > import java.util.List; > > > > import org.apache.samza.sql.api.data.EntityName; > > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > > > > > > /** > > @@ -35,4 +36,16 @@ public class StreamStreamJoinSpec extends > > SimpleOperatorSpec { > > // TODO Auto-generated constructor stub > > } > > > > + @SuppressWarnings("serial") > > + public StreamStreamJoinSpec(String id, List<String> inputRelations, > > String output, List<String> joinKeys) { > > + super(id, new ArrayList<EntityName>() { > > + { > > + for (String input : inputRelations) { > > + add(EntityName.getStreamName(input)); > > + } > > + } > > + }, EntityName.getStreamName(output)); > > + // TODO Auto-generated constructor stub > > + } > > + > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > > index b93d789..0cba39a 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > > @@ -23,7 +23,7 @@ import org.apache.samza.config.Config; > > import org.apache.samza.sql.api.data.Relation; > > import org.apache.samza.sql.api.data.Tuple; > > import org.apache.samza.sql.api.operators.OperatorCallback; > > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > > import org.apache.samza.storage.kv.Entry; > > import org.apache.samza.storage.kv.KeyValueIterator; > > import org.apache.samza.system.OutgoingMessageEnvelope; > > @@ -32,7 +32,6 @@ import org.apache.samza.task.TaskContext; > > import org.apache.samza.task.TaskCoordinator; > > import org.apache.samza.task.sql.SimpleMessageCollector; > > > > - > > /** > > * This is an example build-in operator that performs a simple stream > > re-partition operation. > > * > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > > index c47eed9..e494bff 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.partition; > > > > import org.apache.samza.sql.api.data.EntityName; > > import org.apache.samza.sql.api.operators.OperatorSpec; > > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > > import org.apache.samza.system.SystemStream; > > > > > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > > index d81cc93..a9a83b5 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > > @@ -27,13 +27,12 @@ import org.apache.samza.sql.api.data.EntityName; > > import org.apache.samza.sql.api.data.Relation; > > import org.apache.samza.sql.api.data.Tuple; > > import org.apache.samza.sql.api.operators.OperatorCallback; > > -import org.apache.samza.sql.operators.factory.SimpleOperatorImpl; > > +import org.apache.samza.sql.operators.SimpleOperatorImpl; > > import org.apache.samza.storage.kv.KeyValueIterator; > > import org.apache.samza.task.TaskContext; > > import org.apache.samza.task.TaskCoordinator; > > import org.apache.samza.task.sql.SimpleMessageCollector; > > > > - > > /** > > * This class defines an example build-in operator for a fixed size > > window operator that converts a stream to a relation > > * > > @@ -86,6 +85,7 @@ public class BoundedTimeWindow extends > > SimpleOperatorImpl { > > * @param lengthSec The window size in seconds > > * @param input The input stream name > > * @param output The output relation name > > + * @param callback The user callback object > > */ > > public BoundedTimeWindow(String wndId, int lengthSec, String input, > > String output, OperatorCallback callback) { > > super(new WindowSpec(wndId, EntityName.getStreamName(input), > > EntityName.getStreamName(output), lengthSec), callback); > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > > index eec32ea..6c4eba8 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > > @@ -21,7 +21,7 @@ package org.apache.samza.sql.operators.window; > > > > import org.apache.samza.sql.api.data.EntityName; > > import org.apache.samza.sql.api.operators.OperatorSpec; > > -import org.apache.samza.sql.operators.factory.SimpleOperatorSpec; > > +import org.apache.samza.sql.operators.SimpleOperatorSpec; > > > > > > /** > > @@ -47,6 +47,11 @@ public class WindowSpec extends SimpleOperatorSpec > > implements OperatorSpec { > > this.wndSizeSec = lengthSec; > > } > > > > + public WindowSpec(String id, int wndSize, String input) { > > + super(id, EntityName.getStreamName(input), null); > > + this.wndSizeSec = wndSize; > > + } > > + > > /** > > * Method to get the window state relation name > > * > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > > > b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > > index b29838a..6950f67 100644 > > --- > > > a/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > > +++ > > > b/samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > > @@ -22,7 +22,7 @@ package org.apache.samza.task.sql; > > import org.apache.samza.sql.api.data.Relation; > > import org.apache.samza.sql.api.data.Tuple; > > import org.apache.samza.sql.api.operators.OperatorCallback; > > -import org.apache.samza.sql.operators.factory.NoopOperatorCallback; > > +import org.apache.samza.sql.operators.NoopOperatorCallback; > > import org.apache.samza.storage.kv.Entry; > > import org.apache.samza.storage.kv.KeyValueIterator; > > import org.apache.samza.system.OutgoingMessageEnvelope; > > @@ -57,25 +57,38 @@ public class SimpleMessageCollector implements > > MessageCollector { > > * @param coordinator The {@link > org.apache.samza.task.TaskCoordinator} > > in the context > > */ > > public SimpleMessageCollector(MessageCollector collector, > > TaskCoordinator coordinator) { > > - this.collector = collector; > > - this.coordinator = coordinator; > > + this(collector, coordinator, new NoopOperatorCallback()); > > } > > > > /** > > * This method swaps the {@code callback} with the new one > > * > > - * <p> This method allows the {@link > > org.apache.samza.sql.api.operators.SimpleOperator} to be swapped when the > > collector > > - * is passed down into the next operator's context. Hence, under the > > new operator's context, the correct {@link > > > org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Relation, > > MessageCollector, TaskCoordinator)}, > > - * and {@link > > org.apache.samza.sql.api.operators.OperatorCallback#afterProcess(Tuple, > > MessageCollector, TaskCoordinator)} can be invoked > > + * <p> This method allows the {@link > > org.apache.samza.sql.api.operators.OperatorCallback} to be swapped when > the > > collector > > + * is passed down into the next operator's context. Hence, under the > > new operator's context, the correct callback functions can be invoked > > * > > * @param callback The new {@link > > org.apache.samza.sql.api.operators.OperatorCallback} to be set > > */ > > - public void switchOperatorCallback(OperatorCallback callback) { > > - this.callback = callback; > > + public void switchCallback(OperatorCallback callback) { > > + if (callback == null) { > > + this.callback = new NoopOperatorCallback(); > > + } else { > > + this.callback = callback; > > + } > > + } > > + > > + /** > > + * Method is declared to be final s.t. we enforce that the callback > > functions are called first > > + */ > > + @Override > > + final public void send(OutgoingMessageEnvelope envelope) { > > + this.collector.send(envelope); > > } > > > > /** > > * Method is declared to be final s.t. we enforce that the callback > > functions are called first > > + * > > + * @param deltaRelation The relation to be sent out > > + * @throws Exception Throws exception if failed to send > > */ > > final public void send(Relation deltaRelation) throws Exception { > > Relation rel = this.callback.afterProcess(deltaRelation, collector, > > coordinator); > > @@ -87,6 +100,9 @@ public class SimpleMessageCollector implements > > MessageCollector { > > > > /** > > * Method is declared to be final s.t. we enforce that the callback > > functions are called first > > + * > > + * @param tuple The tuple to be sent out > > + * @throws Exception Throws exception if failed to send > > */ > > final public void send(Tuple tuple) throws Exception { > > Tuple otuple = this.callback.afterProcess(tuple, collector, > > coordinator); > > @@ -106,9 +122,4 @@ public class SimpleMessageCollector implements > > MessageCollector { > > protected void realSend(Tuple tuple) throws Exception { > > this.collector.send((OutgoingMessageEnvelope) tuple.getMessage()); > > } > > - > > - @Override > > - public void send(OutgoingMessageEnvelope envelope) { > > - this.collector.send(envelope); > > - } > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > > > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > > index 20dc701..7370af6 100644 > > --- > > > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > > +++ > > > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > > @@ -22,6 +22,7 @@ package org.apache.samza.task.sql; > > import org.apache.samza.config.Config; > > import org.apache.samza.sql.api.data.Relation; > > import org.apache.samza.sql.api.data.Tuple; > > +import org.apache.samza.sql.api.operators.Operator; > > import org.apache.samza.sql.api.operators.OperatorCallback; > > import org.apache.samza.sql.data.IncomingMessageTuple; > > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > > @@ -39,7 +40,7 @@ import org.apache.samza.task.WindowableTask; > > * > > */ > > public class RandomWindowOperatorTask implements StreamTask, > > InitableTask, WindowableTask { > > - private BoundedTimeWindow wndOp; > > + private Operator operator; > > > > private final OperatorCallback wndCallback = new OperatorCallback() { > > > > @@ -77,20 +78,20 @@ public class RandomWindowOperatorTask implements > > StreamTask, InitableTask, Windo > > public void process(IncomingMessageEnvelope envelope, MessageCollector > > collector, TaskCoordinator coordinator) > > throws Exception { > > // based on tuple's stream name, get the window op and run process() > > - wndOp.process(new IncomingMessageTuple(envelope), collector, > > coordinator); > > + operator.process(new IncomingMessageTuple(envelope), collector, > > coordinator); > > > > } > > > > @Override > > public void window(MessageCollector collector, TaskCoordinator > > coordinator) throws Exception { > > // based on tuple's stream name, get the window op and run process() > > - wndOp.refresh(System.nanoTime(), collector, coordinator); > > + operator.refresh(System.nanoTime(), collector, coordinator); > > } > > > > @Override > > public void init(Config config, TaskContext context) throws Exception > { > > // 1. create a fixed length 10 sec window operator > > - this.wndOp = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", > > "relation1", this.wndCallback); > > - this.wndOp.init(config, context); > > + this.operator = new BoundedTimeWindow("wndOp1", 10, "kafka:stream1", > > "wndOutput", this.wndCallback); > > + this.operator.init(config, context); > > } > > } > > > > > > > http://git-wip-us.apache.org/repos/asf/samza/blob/45b85477/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > > ---------------------------------------------------------------------- > > diff --git > > > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > > > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > > index 9124e3c..d65892c 100644 > > --- > > > a/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > > +++ > > > b/samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > > @@ -24,7 +24,7 @@ import java.util.List; > > > > import org.apache.samza.config.Config; > > import org.apache.samza.sql.data.IncomingMessageTuple; > > -import org.apache.samza.sql.operators.factory.SimpleRouter; > > +import org.apache.samza.sql.operators.SimpleRouter; > > import org.apache.samza.sql.operators.join.StreamStreamJoin; > > import org.apache.samza.sql.operators.partition.PartitionOp; > > import org.apache.samza.sql.operators.window.BoundedTimeWindow; > > @@ -51,25 +51,25 @@ import org.apache.samza.task.WindowableTask; > > */ > > public class StreamSqlTask implements StreamTask, InitableTask, > > WindowableTask { > > > > - private SimpleRouter rteCntx; > > + private SimpleRouter router; > > > > @Override > > public void process(IncomingMessageEnvelope envelope, MessageCollector > > collector, TaskCoordinator coordinator) > > throws Exception { > > - this.rteCntx.process(new IncomingMessageTuple(envelope), collector, > > coordinator); > > + this.router.process(new IncomingMessageTuple(envelope), collector, > > coordinator); > > } > > > > @Override > > public void window(MessageCollector collector, TaskCoordinator > > coordinator) throws Exception { > > - this.rteCntx.refresh(System.nanoTime(), collector, coordinator); > > + this.router.refresh(System.nanoTime(), collector, coordinator); > > } > > > > @Override > > public void init(Config config, TaskContext context) throws Exception > { > > // create all operators via the operator factory > > // 1. create two window operators > > - BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, > > "inputStream1", "fixedWndOutput1"); > > - BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, > > "inputStream2", "fixedWndOutput2"); > > + BoundedTimeWindow wnd1 = new BoundedTimeWindow("fixedWnd1", 10, > > "kafka:inputStream1", "fixedWndOutput1"); > > + BoundedTimeWindow wnd2 = new BoundedTimeWindow("fixedWnd2", 10, > > "kafka:inputStream2", "fixedWndOutput2"); > > // 2. create one join operator > > @SuppressWarnings("serial") > > List<String> inputRelations = new ArrayList<String>() { > > @@ -86,19 +86,19 @@ public class StreamSqlTask implements StreamTask, > > InitableTask, WindowableTask { > > } > > }; > > StreamStreamJoin join = new StreamStreamJoin("joinOp", > > inputRelations, "joinOutput", joinKeys); > > - // 4. create a re-partition operator > > + // 3. create a re-partition operator > > PartitionOp par = new PartitionOp("parOp1", "joinOutput", "kafka", > > "parOutputStrm1", "joinKey", 50); > > > > // Now, connecting the operators via the OperatorRouter > > - this.rteCntx = new SimpleRouter(); > > + this.router = new SimpleRouter(); > > // 1. set two system input operators (i.e. two window operators) > > - this.rteCntx.addOperator(wnd1); > > - this.rteCntx.addOperator(wnd2); > > + this.router.addOperator(wnd1); > > + this.router.addOperator(wnd2); > > // 2. connect join operator to both window operators > > - this.rteCntx.addOperator(join); > > + this.router.addOperator(join); > > // 3. connect re-partition operator to the stream operator > > - this.rteCntx.addOperator(par); > > + this.router.addOperator(par); > > > > - this.rteCntx.init(config, context); > > + this.router.init(config, context); > > } > > } > > > > > > > -- > Milinda Pathirage > > PhD Student | Research Assistant > School of Informatics and Computing | Data to Insight Center > Indiana University > > twitter: milindalakmal > skype: milinda.pathirage > blog: http://milinda.pathirage.org >