[ 
https://issues.apache.org/jira/browse/FLINK-6070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Hutchison updated FLINK-6070:
----------------------------------
    Description: 
Since Java doesn't have built-in tuple types, I find myself using Flink tuples 
for a lot of tasks in Flink programs. One downside is that these tuples are not 
inherently comparable, so when you want to sort a collection of tuples, you 
have to provide a custom comparator.

I created a tuple sorting class, as follows. (Only the methods for Tuple2 are 
defined at the bottom, similar methods could be added for other tuple types.) I 
wanted to get feedback on whether something like this would be considered 
useful for Flink before I submitted a PR.

{code}
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;

/** A class for sorting collections of tuples. */
public class TupleSorter {
    /** Produce a Tuple comparator for the given number of fields, with the 
requested field priority and sort order. */
    private static Comparator<Tuple> newComparator(final int tupleLen, final 
int[] fieldPriority,
            final int[] sortDescendingIndices) {
        if (fieldPriority == null || fieldPriority.length != tupleLen) {
            throw new IllegalArgumentException("Invalid sort order");
        }
        boolean[] idxUsed = new boolean[tupleLen];
        for (int i = 0; i < fieldPriority.length; i++) {
            int idx = fieldPriority[i];
            if (idx < 0 || idx >= tupleLen) {
                throw new IllegalArgumentException("fieldPriority entry out of 
range: " + idx);
            }
            if (idxUsed[idx]) {
                throw new IllegalArgumentException("fieldPriority entry 
duplicated: " + idx);
            }
            idxUsed[idx] = true;
        }
        boolean[] sortDescending = new boolean[tupleLen];
        for (int i = 0; i < sortDescendingIndices.length; i++) {
            int idx = sortDescendingIndices[i];
            if (idx < 0 || idx >= tupleLen) {
                throw new IllegalArgumentException("sortDescendingIndices entry 
out of range: " + idx);
            }
            sortDescending[idx] = true;
        }
        return (tuple0, tuple1) -> {
            for (int i = 0; i < tupleLen; i++) {
                int idx = fieldPriority[i];
                @SuppressWarnings({ "rawtypes", "unchecked" })
                int diff = ((Comparable) 
tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx));
                if (sortDescending[i]) {
                    diff = -diff;
                }
                if (diff != 0) {
                    return diff;
                }
            }
            return 0;
        };
    }

    /**
     * Sort a list of tuples.
     * 
     * @param list
     *            The list of tuples.
     * @param fieldPriority
     *            The sort priority for the fields (primary an secondary sort 
key): a permutation of the field indices 0
     *            and 1. The default sort order within a field is ascending.
     * @param sortDescendingIndices
     *            If provided, inverts the sort order for a given field index 
from ascending to descending order.
     */
    public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> void 
sort(final List<Tuple2<T0, T1>> list,
            final int[] fieldPriority, final int... sortDescendingIndices) {
        list.sort(newComparator(/* tupleLen = */ 2, fieldPriority, 
sortDescendingIndices));
    }

    /**
     * Produce a sorted copy of a collection of tuples.
     * 
     * @param list
     *            The list of tuples.
     * @param fieldPriority
     *            The sort priority for the fields (primary an secondary sort 
key): a permutation of the field indices 0
     *            and 1. The default sort order within a field is ascending.
     * @param sortDescendingIndices
     *            If provided, inverts the sort order for a given field index 
from ascending to descending order.
     */
    public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> 
ArrayList<Tuple2<T0, T1>> sortCopy(
            final Collection<Tuple2<T0, T1>> collection, final int[] 
fieldPriority,
            final int... sortDescendingIndices) {
        ArrayList<Tuple2<T0, T1>> list = new ArrayList<>(collection);
        Collections.sort(list, newComparator(/* tupleLen = */ 2, fieldPriority, 
sortDescendingIndices));
        return list;
    }
}
{code}

  was:
Since Java doesn't have built-in tuple types, I find myself using Flink tuples 
for a lot of tasks in Flink programs. One downside is that these tuples are not 
inherently comparable, so when you want to sort a collection of tuples, you 
have to provide a custom comparator.

I created a ComparableTuple2 type, as follows. I wanted to get feedback on 
whether something like this would be considered useful for Flink before I 
submitted a PR. Also, I don't know how high I should go with the field arity 
for a ComparableTuple -- presumably not as high as for non-comparable tuples?

{code}
import org.apache.flink.api.java.tuple.Tuple2;

/** A comparable tuple, consisting of comparable fields that act as primary and 
secondary sort keys. */
public class ComparableTuple2<T0 extends Comparable<T0>, T1 extends 
Comparable<T1>> extends Tuple2<T0, T1>
        implements Comparable<Tuple2<T0, T1>> {
    private static final long serialVersionUID = 1L;

    private boolean invertSortOrder0;
    private boolean invertSortOrder1;

    public ComparableTuple2() {
    }

    /**
     * Create a 2-tuple of comparable elements.
     * 
     * @param f0
     *            The first element, which is also the primary sort key, and 
sorts in ascending order.
     * @param f1
     *            The second element, which is also the secondary sort key, and 
sorts in ascending order.
     * @param invertSortOrder0
     *            If true, invert the sort order for the first field (i.e. sort 
in descending order).
     * @param invertSortOrder1
     *            If true, invert the sort order for the second field (i.e. 
sort in descending order).
     */
    public ComparableTuple2(T0 f0, T1 f1) {
        super(f0, f1);
    }

    /**
     * Create a comparable 2-tuple out of comparable elements.
     * 
     * @param f0
     *            The first element, which is also the primary sort key, and 
sorts in ascending order if
     *            invertSortOrder0 == false, else sorts in descending order.
     * @param f1
     *            The second element, which is also the secondary sort key, and 
sorts in decending order if
     *            invertSortOrder1 == false, else sorts in descending order.
     * @param invertSortOrder0
     *            If true, invert the sort order for the first field (i.e. sort 
in descending order).
     * @param invertSortOrder1
     *            If true, invert the sort order for the second field (i.e. 
sort in descending order).
     */
    public ComparableTuple2(final T0 f0, final T1 f1, final boolean 
invertSortOrder0,
            final boolean invertSortOrder1) {
        super(f0, f1);
        this.invertSortOrder0 = invertSortOrder0;
        this.invertSortOrder1 = invertSortOrder1;
    }

    /**
     * Comparison function that compares first the primary sort key, f0, and 
then if equal, compares the secondary sort
     * key, f1.
     */
    @Override
    public int compareTo(final Tuple2<T0, T1> o) {
        int diff = this.f0.compareTo(o.f0);
        if (invertSortOrder0) {
            diff = -diff;
        }
        if (diff != 0) {
            return diff;
        }
        diff = this.f1.compareTo(o.f1);
        if (invertSortOrder1) {
            diff = -diff;
        }
        return diff;
    }
}
{code}


> Suggestion: add ComparableTuple types
> -------------------------------------
>
>                 Key: FLINK-6070
>                 URL: https://issues.apache.org/jira/browse/FLINK-6070
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>            Priority: Minor
>
> Since Java doesn't have built-in tuple types, I find myself using Flink 
> tuples for a lot of tasks in Flink programs. One downside is that these 
> tuples are not inherently comparable, so when you want to sort a collection 
> of tuples, you have to provide a custom comparator.
> I created a tuple sorting class, as follows. (Only the methods for Tuple2 are 
> defined at the bottom, similar methods could be added for other tuple types.) 
> I wanted to get feedback on whether something like this would be considered 
> useful for Flink before I submitted a PR.
> {code}
> import java.util.ArrayList;
> import java.util.Collection;
> import java.util.Collections;
> import java.util.Comparator;
> import java.util.List;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> /** A class for sorting collections of tuples. */
> public class TupleSorter {
>     /** Produce a Tuple comparator for the given number of fields, with the 
> requested field priority and sort order. */
>     private static Comparator<Tuple> newComparator(final int tupleLen, final 
> int[] fieldPriority,
>             final int[] sortDescendingIndices) {
>         if (fieldPriority == null || fieldPriority.length != tupleLen) {
>             throw new IllegalArgumentException("Invalid sort order");
>         }
>         boolean[] idxUsed = new boolean[tupleLen];
>         for (int i = 0; i < fieldPriority.length; i++) {
>             int idx = fieldPriority[i];
>             if (idx < 0 || idx >= tupleLen) {
>                 throw new IllegalArgumentException("fieldPriority entry out 
> of range: " + idx);
>             }
>             if (idxUsed[idx]) {
>                 throw new IllegalArgumentException("fieldPriority entry 
> duplicated: " + idx);
>             }
>             idxUsed[idx] = true;
>         }
>         boolean[] sortDescending = new boolean[tupleLen];
>         for (int i = 0; i < sortDescendingIndices.length; i++) {
>             int idx = sortDescendingIndices[i];
>             if (idx < 0 || idx >= tupleLen) {
>                 throw new IllegalArgumentException("sortDescendingIndices 
> entry out of range: " + idx);
>             }
>             sortDescending[idx] = true;
>         }
>         return (tuple0, tuple1) -> {
>             for (int i = 0; i < tupleLen; i++) {
>                 int idx = fieldPriority[i];
>                 @SuppressWarnings({ "rawtypes", "unchecked" })
>                 int diff = ((Comparable) 
> tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx));
>                 if (sortDescending[i]) {
>                     diff = -diff;
>                 }
>                 if (diff != 0) {
>                     return diff;
>                 }
>             }
>             return 0;
>         };
>     }
>     /**
>      * Sort a list of tuples.
>      * 
>      * @param list
>      *            The list of tuples.
>      * @param fieldPriority
>      *            The sort priority for the fields (primary an secondary sort 
> key): a permutation of the field indices 0
>      *            and 1. The default sort order within a field is ascending.
>      * @param sortDescendingIndices
>      *            If provided, inverts the sort order for a given field index 
> from ascending to descending order.
>      */
>     public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> void 
> sort(final List<Tuple2<T0, T1>> list,
>             final int[] fieldPriority, final int... sortDescendingIndices) {
>         list.sort(newComparator(/* tupleLen = */ 2, fieldPriority, 
> sortDescendingIndices));
>     }
>     /**
>      * Produce a sorted copy of a collection of tuples.
>      * 
>      * @param list
>      *            The list of tuples.
>      * @param fieldPriority
>      *            The sort priority for the fields (primary an secondary sort 
> key): a permutation of the field indices 0
>      *            and 1. The default sort order within a field is ascending.
>      * @param sortDescendingIndices
>      *            If provided, inverts the sort order for a given field index 
> from ascending to descending order.
>      */
>     public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> 
> ArrayList<Tuple2<T0, T1>> sortCopy(
>             final Collection<Tuple2<T0, T1>> collection, final int[] 
> fieldPriority,
>             final int... sortDescendingIndices) {
>         ArrayList<Tuple2<T0, T1>> list = new ArrayList<>(collection);
>         Collections.sort(list, newComparator(/* tupleLen = */ 2, 
> fieldPriority, sortDescendingIndices));
>         return list;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to