Although you run 
`ds.map(blahblah).sortPartition(blahblah).mapPartition(blahblah).distinct()`, 
DataSet ds is not changed.
You should receive the result of transformation.

So if you modify the code to `intermediateResult = blahblah; result = 
intermediateResult.collect();`, the test works.

Regards,
Chiwan Park

> On Jun 25, 2015, at 10:03 AM, Matthias J. Sax <mj...@informatik.hu-berlin.de> 
> wrote:
> 
> Hi,
> 
> I worked on rewriting flink-test according to
> https://issues.apache.org/jira/browse/FLINK-2275
> 
> In "org.apache.flink.test.javaApiOperators.SortPartitionITCase" I hit
> something strange. When I rewrite the code slightly differently, the
> test passes or fails and I have no idea why.
> 
> The following code works (result is of type java.util.List)
> 
>> result = ds
>>      .map(new IdMapper()).setParallelism(4) // parallelize input
>>      .sortPartition(1, Order.DESCENDING)
>>      .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new 
>> Tuple3Checker()))
>>      .distinct().collect();
> 
> Rewriting the above as follows result in a failing test:
> 
>> ds.map(new IdMapper()).setParallelism(4) // parallelize input
>>      .sortPartition(1, Order.DESCENDING)
>>      .mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new 
>> Tuple3Checker()))
>>      .distinct();
>> result = ds.collect();
> 
> I have no clue what the problem might be. The code looks semantically
> identical to me. Can anyone explain the difference? Do I miss anything?
> Or is this a bug?
> 
> You can find the working version of the code in my github repo:
> https://github.com/mjsax/flink/tree/flink-2275-migrateFlinkTests
> 
> 
> -Matthias
> 





Reply via email to