Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1086#discussion_r52136638
  
    --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
    @@ -436,8 +444,91 @@ public Stream partitionAggregate(Fields inputFields, 
ReducerAggregator agg, Fiel
             return chainedAgg()
                    .partitionAggregate(inputFields, agg, functionFields)
                    .chainEnd();
    -    }  
    -    
    +    }
    +
    +    /**
    +     * This aggregator operation computes the minimum of tuples by the 
given {@code inputFieldName} and it is
    +     * assumed that its value is an instance of {@code Comparable}.
    +     *
    +     * @param inputFieldName input field name
    +     * @return
    +     */
    +    public Stream minBy(String inputFieldName) {
    +        Aggregator<ComparisonAggregator.State> min = new 
Min(inputFieldName);
    +        return comparableAggregateStream(inputFieldName, min);
    +    }
    +
    +    /**
    +     * This aggregator operation computes the minimum of tuples by the 
given {@code inputFieldName} in a stream by
    +     * using the given {@code comparator}.
    +     *
    +     * @param inputFieldName input field name
    +     * @param comparator comparator used in for finding minimum of two 
tuple values of {@code inputFieldName}.
    +     * @param <T> type of tuple's given input field value.
    +     * @return
    +     */
    +    public <T> Stream minBy(String inputFieldName, Comparator<T> 
comparator) {
    +        Aggregator<ComparisonAggregator.State> min = new 
MinWithComparator<>(inputFieldName, comparator);
    +        return comparableAggregateStream(inputFieldName, min);
    +    }
    +
    +    /**
    +     * This aggregator operation computes the minimum of tuples in a 
stream by using the given {@code comparator} with
    +     * {@code TridentTuple}s.
    +     *
    +     * @param comparator comparator used in for finding minimum of two 
tuple values.
    +     * @return
    +     */
    +    public Stream min(Comparator<TridentTuple> comparator) {
    +        Aggregator<ComparisonAggregator.State> min = new 
MinWithComparator<>(comparator);
    +        return comparableAggregateStream(null, min);
    +    }
    +
    +    /**
    +     * This aggregator operation computes the maximum of tuples by the 
given {@code inputFieldName} and it is
    +     * assumed that its value is an instance of {@code Comparable}.
    --- End diff --
    
    What would be the behavior if the values are not  `Comparable` ? Maybe 
document that as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to