[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208263#comment-15208263
 ] 

ASF GitHub Bot commented on STORM-676:
--------------------------------------

Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1072#discussion_r57143253
  
    --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
    @@ -594,19 +607,155 @@ public Stream aggregate(Fields inputFields, 
ReducerAggregator agg, Fields functi
                     .aggregate(inputFields, agg, functionFields)
                     .chainEnd();
         }
    -    
    +
    +    /**
    +     * Returns a stream of tuples which are aggregated results of a 
tumbling window with every {@code windowCount} of tuples.
    +     *
    +     * @param windowCount represents no of tuples in the window
    +     * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
    +     * @param inputFields projected fields for aggregator
    +     * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
    +     * @param functionFields fields of values to emit with aggregation.
    +     *
    +     * @return
    +     */
    +    public Stream tumblingWindow(int windowCount, WindowsStoreFactory 
windowStoreFactory,
    +                                      Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
    +        return window(TumblingCountWindow.of(windowCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
    +    }
    +
    +    /**
    +     * Returns a stream of tuples which are aggregated results of a 
sliding window with every {@code windowCount} of tuples
    +     * and slides the window with {@code slideCount}.
    +     *
    +     * @param windowCount represents tuples count of a window
    +     * @param slideCount the number of tuples after which the window slides
    +     * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
    +     * @param inputFields projected fields for aggregator
    +     * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
    +     * @param functionFields fields of values to emit with aggregation.
    +     *
    +     * @return
    +     */
    +    public Stream slidingWindow(int windowCount, int slideCount, 
WindowsStoreFactory windowStoreFactory,
    +                                     Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
    +        return window(SlidingCountWindow.of(windowCount, slideCount), 
windowStoreFactory, inputFields, aggregator, functionFields);
    +    }
    +
    +    /**
    +     * Returns a stream of tuples which are aggregated results of a window 
tumbles at duration of {@code windowDuration}
    +     *
    +     * @param windowDuration represents tumbling window duration 
configuration
    +     * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
    +     * @param inputFields projected fields for aggregator
    +     * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
    +     * @param functionFields fields of values to emit with aggregation.
    +     *
    +     * @return
    +     */
    +    public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, 
WindowsStoreFactory windowStoreFactory,
    +                                     Fields inputFields, Aggregator 
aggregator, Fields functionFields) {
    +        return window(TumblingDurationWindow.of(windowDuration), 
windowStoreFactory, inputFields, aggregator, functionFields);
    +    }
    +
    +    /**
    +     * Returns a stream of tuples which are aggregated results of a window 
which slides at duration of {@code slideDuration}
    +     * and completes a window at {@code windowDuration}
    +     *
    +     * @param windowDuration represents window duration configuration
    +     * @param slidingInterval the time duration after which the window 
slides
    +     * @param windowStoreFactory intermediary tuple store for storing 
windowing tuples
    +     * @param inputFields projected fields for aggregator
    +     * @param aggregator aggregator to run on the window of tuples to 
compute the result and emit to the stream.
    +     * @param functionFields fields of values to emit with aggregation.
    +     *
    +     * @return
    --- End diff --
    
    `@return the new Stream`


> Storm Trident support for sliding/tumbling windows
> --------------------------------------------------
>
>                 Key: STORM-676
>                 URL: https://issues.apache.org/jira/browse/STORM-676
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Sriharsha Chintalapani
>            Assignee: Satish Duggana
>             Fix For: 1.0.0, 2.0.0
>
>         Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to