[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348758#comment-15348758 ]
ASF GitHub Bot commented on APEXMALHAR-2085: -------------------------------------------- Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68469300 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java --- @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.ValidationException; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedOperator; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Function; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating + * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated + * values are stored in the storage. + * + */ +@InterfaceStability.Evolving +public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, AccumulationT extends Accumulation> + extends BaseOperator implements WindowedOperator<InputT> +{ + + protected WindowOption windowOption; + protected TriggerOption triggerOption; + protected long allowedLatenessMillis = -1; + protected WindowedStorage<WindowState> windowStateMap; + + private Function<InputT, Long> timestampExtractor; + + private long currentWatermark; + private boolean triggerAtWatermark; + private long earlyTriggerCount; + private long earlyTriggerMillis; + private long lateTriggerCount; + private long lateTriggerMillis; + private long currentDerivedTimestamp = -1; + private long windowWidthMillis; + protected DataStorageT dataStorage; + protected DataStorageT retractionStorage; + protected AccumulationT accumulation; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); + + public final transient DefaultInputPort<Tuple<InputT>> input = new DefaultInputPort<Tuple<InputT>>() + { + @Override + public void process(Tuple<InputT> tuple) + { + processTuple(tuple); + } + }; + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ControlTuple> controlInput = new DefaultInputPort<ControlTuple>() + { + @Override + public void process(ControlTuple tuple) + { + if (tuple instanceof ControlTuple.Watermark) { + processWatermark((ControlTuple.Watermark)tuple); + } + } + }; + + + // TODO: multiple input ports for join operations + + public final transient DefaultOutputPort<Tuple<OutputT>> output = new DefaultOutputPort<>(); + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + public final transient DefaultOutputPort<ControlTuple> controlOutput = new DefaultOutputPort<>(); + + /** + * Process the incoming data tuple + * + * @param tuple + */ + public void processTuple(Tuple<InputT> tuple) + { + long timestamp = extractTimestamp(tuple); --- End diff -- extractTimestamp method is called here, then in getWindowsValue and also in assignWindow for the same tuple. If its a TimestampedTuple its fine as as its simple a getter method but if TimeExtractor is set the operation of extracting timestamp might get costly depending on the logic in extractor. Can this call be made only once? > Implement Windowed Operators > ---------------------------- > > Key: APEXMALHAR-2085 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: Siyuan Hua > Assignee: David Yan > > As per our recent several discussions in the community. A group of Windowed > Operators that delivers the window semantic follows the google Data Flow > model(https://cloud.google.com/dataflow/) is very important. > The operators should be designed and implemented in a way for > High-level API > Beam translation > Easy to use with other popular operator > {panel:title=Operator Hierarchy} > Hierarchy of the operators, > The windowed operators should cover all possible transformations that require > window, and batch processing is also considered as special window called > global window > {code} > +-------------------+ > +---------> | WindowedOperator | <--------+ > | +--------+----------+ | > | ^ ^--------------------------------+ > | | | | > | | | | > +------+--------+ +------+------+ +-------+-----+ +------+-----+ > |CombineOperator| |GroupOperator| |KeyedOperator| |JoinOperator| > +---------------+ +-------------+ +------+------+ +-----+------+ > +---------^ ^ ^ > | | | > +--------+---+ +-----+----+ +----+----+ > |KeyedCombine| |KeyedGroup| | CoGroup | > +------------+ +----------+ +---------+ > {code} > Combine operation includes all operations that combine all tuples in one > window into one or small number of tuples, Group operation group all tuples > in one window, Join and CoGroup are used to join and group tuples from > different inputs. > {panel} > {panel:title=Components} > * Window Component > It includes configuration, window state that should be checkpointed, etc. It > should support NonMergibleWindow(fixed or slide) MergibleWindow(Session) > * Trigger > It should support early trigger, late trigger with customizable trigger > behaviour > * Other related components: > ** Watermark generator, can be plugged into input source to generate watermark > ** Tuple schema support: > It should handle either predefined tuple type or give a declarative API to > describe the user defined tuple class > {panel} > Most component API should be reused in High-Level API > This is the umbrella ticket, separate tickets would be created for different > components and operators respectively -- This message was sent by Atlassian JIRA (v6.3.4#6332)