Although you run `ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, DataSet ds is not changed. You should receive the result of transformation.
So if you modify the code to `intermediateResult = blahblah; result = intermediateResult.collect();`, the test works. Regards, Chiwan Park > On Jun 25, 2015, at 10:03 AM, Matthias J. Sax <mj...@informatik.hu-berlin.de> > wrote: > > Hi, > > I worked on rewriting flink-test according to > https://issues.apache.org/jira/browse/FLINK-2275 > > In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit > something strange. When I rewrite the code slightly differently, the > test passes or fails and I have no idea why. > > The following code works (result is of type java.util.List) > >> result = ds >> .map(new IdMapper()).setParallelism(4) // parallelize input >> .sortPartition(1, Order.DESCENDING) >> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new >> Tuple3Checker())) >> .distinct().collect(); > > Rewriting the above as follows result in a failing test: > >> ds.map(new IdMapper()).setParallelism(4) // parallelize input >> .sortPartition(1, Order.DESCENDING) >> .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new >> Tuple3Checker())) >> .distinct(); >> result = ds.collect(); > > I have no clue what the problem might be. The code looks semantically > identical to me. Can anyone explain the difference? Do I miss anything? > Or is this a bug? > > You can find the working version of the code in my github repo: > https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests > > > -Matthias >