[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362957#comment-15362957 ]
ASF GitHub Bot commented on APEXMALHAR-2085: -------------------------------------------- Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r69614298 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/Accumulation.java --- @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This interface is for the processing part of the WindowedOperator. + * We can assume that all stateful processing of the WindowedOperator is a form of accumulation. + * + * In most cases, AccumT is the same as OutputT. But in some cases, the accumulated type and the output type may be + * different. For example, if we are doing the AVERAGE of doubles, InputT will be double, and we need the SUM and the + * COUNT stored as type AccumT, and AccumT will be a pair of double and long, in which double is the sum of the inputs, + * and long is the number of inputs. OutputT will be double, because it represents the average of the inputs. + */ +@InterfaceStability.Evolving +public interface Accumulation<InputT, AccumT, OutputT> +{ + /** + * Returns the default accumulated value when nothing has been accumulated + * + * @return + */ + AccumT defaultAccumulatedValue(); + + /** + * Accumulates the input to the accumulated value + * + * @param accumulatedValue + * @param input + * @return + */ + AccumT accumulate(AccumT accumulatedValue, InputT input); --- End diff -- @vrozov Good point. I'm assuming you're asking why not: ```java void accumulate(AccumT accumulatedValue, InputT input); ``` with accumulatedValue updated in place. But doing it will make it a lot less flexible because the underlying storage might not support this kind of operation. For example, if the storage supports get(key) and put(key, value) with get(key) returning not a reference to the actual object (possibly as a result of deserialization), then it would not work. > Implement Windowed Operators > ---------------------------- > > Key: APEXMALHAR-2085 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: Siyuan Hua > Assignee: David Yan > > As per our recent several discussions in the community. A group of Windowed > Operators that delivers the window semantic follows the google Data Flow > model(https://cloud.google.com/dataflow/) is very important. > The operators should be designed and implemented in a way for > High-level API > Beam translation > Easy to use with other popular operator > {panel:title=Operator Hierarchy} > Hierarchy of the operators, > The windowed operators should cover all possible transformations that require > window, and batch processing is also considered as special window called > global window > {code} > +-------------------+ > +---------> | WindowedOperator | <--------+ > | +--------+----------+ | > | ^ ^--------------------------------+ > | | | | > | | | | > +------+--------+ +------+------+ +-------+-----+ +------+-----+ > |CombineOperator| |GroupOperator| |KeyedOperator| |JoinOperator| > +---------------+ +-------------+ +------+------+ +-----+------+ > +---------^ ^ ^ > | | | > +--------+---+ +-----+----+ +----+----+ > |KeyedCombine| |KeyedGroup| | CoGroup | > +------------+ +----------+ +---------+ > {code} > Combine operation includes all operations that combine all tuples in one > window into one or small number of tuples, Group operation group all tuples > in one window, Join and CoGroup are used to join and group tuples from > different inputs. > {panel} > {panel:title=Components} > * Window Component > It includes configuration, window state that should be checkpointed, etc. It > should support NonMergibleWindow(fixed or slide) MergibleWindow(Session) > * Trigger > It should support early trigger, late trigger with customizable trigger > behaviour > * Other related components: > ** Watermark generator, can be plugged into input source to generate watermark > ** Tuple schema support: > It should handle either predefined tuple type or give a declarative API to > describe the user defined tuple class > {panel} > Most component API should be reused in High-Level API > This is the umbrella ticket, separate tickets would be created for different > components and operators respectively -- This message was sent by Atlassian JIRA (v6.3.4#6332)