[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15311123#comment-15311123
 ] 

David Yan edited comment on APEXMALHAR-2085 at 6/1/16 9:41 PM:
---------------------------------------------------------------

Based on my understanding, this looks like what needs to be done in a very high 
level.
Assuming T is the type of the tuples:

1. Watermark generator operator that takes T as the input and generate 
TimeStampedValue<T> tuples with watermark tuples. The watermark generator takes 
the following as configuration.
- The function to get the timestamp, equivalent of lambda T -> milliseconds 
from epoch
- watermark type (perfect, heuristic, etc). I need a little more research on 
how watermark is actually generated

2. A modified DimensionOperator. This has two stages:

* Stage 1: Window generator that takes TimestampedValue<T> as the input and 
generate WindowedValue<T> (WindowedValue is an abstract class from Beam, which 
has the window information for the tuple).
   ** The WindowFn object to assign the window(s) for each tuple
   ** Possibility of merging windows
* Stage 2: The actual pane generation and takes WindowedValue<T> as the input 
and WindowedValue<R> as the output. The output includes any retraction values 
(as a result from lateness and window merging)

This operator takes the following as configuration:
  - The Windowing Function (type WindowFn): Describes how the windowing
  - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & 
Retracting
  - Allowed lateness (type Duration): For dropping late tuples and purging old 
state (in conjunction of committed checkpoint)
  - The Aggregation (type lambda Iterable<T> -> R): How we want to aggregate 
the tuple data. 
  - Triggering (type Trigger): When we actually output the result to the output 
port

The DimensionOperator will need to implement the above features.
Also, the DimensionOperator will need to see whether T is actually a KV 
(instanceof) and if so, the tuples are aggregated by window AND key.

This is very preliminary and it's possible that I'm going down the wrong path. 
So please provide your feedback.


was (Author: davidyan):
Based on my understanding, this looks like what needs to be done in a very high 
level.
Assuming T is the type of the tuples:

1. Watermark generator operator that takes T as the input and generate 
TimeStampedValue<T> tuples with watermark tuples. The watermark generator takes 
the following as configuration.
- The function to get the timestamp, equivalent of lambda T -> milliseconds 
from epoch
- watermark type (perfect, heuristic, etc). I need a little more research on 
how watermark is actually generated

2. A modified DimensionOperator. This has two stages:

* Stage 1: Window generator that takes TimestampedValue<T> as the input and 
generate WindowedValue<T> (WindowedValue is an abstract class from Beam, which 
has the window information for the tuple).
   ** The WindowFn object to assign the window(s) for each tuple
   ** Possibility of merging windows
* Stage 2: The actual pane generation and takes WindowedValue<T> as the input 
and WindowedValue<R> as the output. The output includes any retraction values 
(as a result from lateness and window merging)

This operator takes the following as configuration:
  - Accumulation mode (type Enum): Accumulating, Discarding or Accumulating & 
Retracting
  - Allowed lateness (type Duration): For dropping late tuples and purging old 
state (in conjunction of committed checkpoint)
  - The Aggregation (type lambda Iterable<T> -> R): How we want to aggregate 
the tuple data. 
  - Triggering (type Trigger): When we actually output the result to the output 
port
The DimensionOperator will need to implement the above features.
Also, the DimensionOperator will need to see whether T is actually a KV 
(instanceof) and if so, the tuples are aggregated by window AND key.

This is very preliminary and it's possible that I'm going down the wrong path. 
So please provide your feedback.

> Implement Windowed Operators
> ----------------------------
>
>                 Key: APEXMALHAR-2085
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Siyuan Hua
>            Assignee: Siyuan Hua
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>                    +-------------------+
>        +---------> |  WindowedOperator | <--------+
>        |           +--------+----------+          |
>        |                    ^      ^--------------------------------+
>        |                    |                     |                 |
>        |                    |                     |                 |
> +------+--------+    +------+------+      +-------+-----+    +------+-----+
> |CombineOperator|    |GroupOperator|      |KeyedOperator|    |JoinOperator|
> +---------------+    +-------------+      +------+------+    +-----+------+
>                                    +---------^   ^                 ^
>                                    |             |                 |
>                           +--------+---+   +-----+----+       +----+----+
>                           |KeyedCombine|   |KeyedGroup|       | CoGroup |
>                           +------------+   +----------+       +---------+
> {code}
> Combine operation includes all operations that combine all tuples in one 
> window into one or small number of tuples, Group operation group all tuples 
> in one window, Join and CoGroup are used to join and group tuples from 
> different inputs.
> {panel}
> {panel:title=Components}
> * Window Component
> It includes configuration, window state that should be checkpointed, etc. It 
> should support NonMergibleWindow(fixed or slide) MergibleWindow(Session)
> * Trigger
> It should support early trigger, late trigger with customizable trigger 
> behaviour 
> * Other related components:
> ** Watermark generator, can be plugged into input source to generate watermark
> ** Tuple schema support:
> It should handle either predefined tuple type or give a declarative API to 
> describe the user defined tuple class
> {panel}
> Most component API should be reused in High-Level API
> This is the umbrella ticket, separate tickets would be created for different 
> components and operators respectively 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to