Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/347#discussion_r23914605
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---
    @@ -83,6 +93,107 @@ public DataSink(DataSet<T> data, OutputFormat<T> 
format, TypeInformation<T> type
        }
     
        /**
    +    * Sorts each local partition of a {@link 
org.apache.flink.api.java.tuple.Tuple} data set
    +    * on the specified field in the specified {@link Order} before it is 
emitted by the output format.</br>
    +    * <b>Note: Only tuple data sets can be sorted using integer field 
indices.</b><br/>
    +    * The tuple data set can be sorted on multiple fields in different 
orders
    +    * by chaining {@link #sortLocalOutput(int, Order)} calls.
    +    *
    +    * @param field The Tuple field on which the data set is locally sorted.
    +    * @param order The Order in which the specified Tuple field is locally 
sorted.
    +    * @return This data sink operator with specified output order.
    +    *
    +    * @see org.apache.flink.api.java.tuple.Tuple
    +    * @see Order
    +    */
    +   public DataSink<T> sortLocalOutput(int field, Order order) {
    +
    +           if (!this.type.isTupleType()) {
    +                   throw new InvalidProgramException("Specifying order 
keys via field positions is only valid for tuple data types");
    +           }
    +           if (field >= this.type.getArity()) {
    +                   throw new InvalidProgramException("Order key out of 
tuple bounds.");
    +           }
    +
    +           if(this.sortKeyPositions == null) {
    +                   // set sorting info
    +                   this.sortKeyPositions = new int[] {field};
    +                   this.sortOrders = new Order[] {order};
    +           } else {
    +                   // append sorting info to exising info
    +                   int newLength = this.sortKeyPositions.length + 1;
    +                   this.sortKeyPositions = 
Arrays.copyOf(this.sortKeyPositions, newLength);
    +                   this.sortOrders = Arrays.copyOf(this.sortOrders, 
newLength);
    +                   this.sortKeyPositions[newLength-1] = field;
    +                   this.sortOrders[newLength-1] = order;
    +           }
    +           return this;
    +   }
    +
    +   /**
    +    * Sorts each local partition of a data set on the field(s) specified 
by the field expression
    +    * in the specified {@link Order} before it is emitted by the output 
format.</br>
    +    * <b>Note: Non-composite types can only be sorted on the full element 
which is specified by
    +    * a wildcard expression ("*" or "_").</b><br/>
    +    * Data sets of composite types (Tuple or Pojo) can be sorted on 
multiple fields in different orders
    +    * by chaining {@link #sortLocalOutput(String, Order)} calls.
    +    *
    +    * @param fieldExpression The field expression for the field(s) on 
which the data set is locally sorted.
    +    * @param order The Order in which the specified field(s) are locally 
sorted.
    +    * @return This data sink operator with specified output order.
    +    *
    +    * @see Order
    +    */
    +   public DataSink<T> sortLocalOutput(String fieldExpression, Order order) 
{
    +
    +           int numFields;
    +           int[] fields;
    +           Order[] orders;
    +
    +           if(this.type instanceof CompositeType) {
    +                   // compute flat field positions for (nested) sorting 
fields
    +                   Keys.ExpressionKeys<T> ek;
    +                   try {
    +                           ek = new Keys.ExpressionKeys<T>(new 
String[]{fieldExpression}, this.type);
    +                   } catch(IllegalArgumentException iae) {
    +                           throw new 
InvalidProgramException(iae.getMessage());
    --- End diff --
    
    Why are you creating a new exception with the error message instead of 
forwarding the illegal argument exception?
    I personally like it very much when I can find the exact location where the 
exception was thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to