Hi Fabian,

I commented on the issue and attached the program reproducing the bug, But
I couldn't find how to re-open it (I think maybe I don't have enough
permissions?).

Best,
Yassine


2016-10-25 12:49 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Yassine,
>
> I thought I had fixed that bug a few weeks a ago, but apparently the fix
> did not catch all cases.
> Can you please reopen FLINK-2662 and post the program to reproduce the bug
> there?
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-2662
>
> 2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Hi all,
>>
>> My job fails with the folowing exception : CompilerException: Bug: Plan
>> generation for Unions picked a ship strategy between binary plan operators.
>> The exception happens when adding partitionByRange(1).sortPartition(1,
>> Order.DESCENDING) to the union of datasets.
>>
>> I made a smaller version that reproduces the bug :
>>
>> import org.apache.flink.api.common.functions.GroupReduceFunction;
>> import org.apache.flink.api.common.operators.Order;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.util.Collector;
>> import java.util.Iterator;
>>
>> public class BugReproduce {
>>     public static void main(String[] args) throws Exception {
>>         final ExecutionEnvironment env = ExecutionEnvironment.getExecut
>> ionEnvironment();
>>         DataSet<WC> wc1 = env.fromElements(new WC("first",1), new
>> WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
>>         DataSet<WC> wc2 = env.fromElements(new WC("third",1), new
>> WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
>>         DataSet<WC> wc3 = env.fromElements(new WC("fifth",1), new
>> WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));
>>
>>         DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
>>         DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
>>         DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
>>         DataSet<Tuple2<String,Integer>> all =
>> aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
>>         all.partitionByRange(1).sortPartition(1,
>> Order.DESCENDING).print();
>>
>>     }
>>
>>     public static DataSet<Tuple2<String,Integer>>
>> aggregateWC(DataSet<WC> input){
>>         return input.groupBy("word").reduceGroup(new
>> GroupReduceFunction<WC, Tuple2<String, Integer>>() {
>>             @Override
>>             public void reduce(Iterable<WC> iterable,
>> Collector<Tuple2<String, Integer>> collector) throws Exception {
>>                 Integer count = 0;
>>                 Iterator<WC> iterator = iterable.iterator();
>>                 if (iterator.hasNext()) {
>>                     String word= iterator.next().word;
>>                     while (iterator.hasNext()) {
>>                         iterator.next();
>>                         count += 1;
>>                     }
>>                     collector.collect(Tuple2.of(word,count));
>>                 }
>>             }
>>         });
>>     }
>>
>>     public static class WC {
>>         public String word;
>>         public int count;
>>
>>         public WC() {
>>         }
>>
>>         public WC(String word, int count) {
>>             this.word = word;
>>             this.count = count;
>>         }
>>
>>         public String getWord() {
>>             return word;
>>         }
>>
>>         public void setWord(String word) {
>>             this.word = word;
>>         }
>>
>>         public int getCount() {
>>             return count;
>>         }
>>
>>         public void setCount(int count) {
>>             this.count = count;
>>         }
>>     }
>> }
>>
>> Here is the exception stacktrace:
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> Bug: Plan generation for Unions picked a ship strategy between binary plan
>> operators.
>> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.
>> collect(BinaryUnionReplacer.java:113)
>> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.
>> postVisit(BinaryUnionReplacer.java:72)
>> at org.apache.flink.optimizer.traversals.BinaryUnionReplacer.
>> postVisit(BinaryUnionReplacer.java:41)
>> at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(Dua
>> lInputPlanNode.java:170)
>> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(
>> SingleInputPlanNode.java:199)
>> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(
>> SingleInputPlanNode.java:199)
>> at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(
>> SingleInputPlanNode.java:199)
>> at org.apache.flink.optimizer.plan.OptimizedPlan.accept(Optimiz
>> edPlan.java:128)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu
>> tor.java:185)
>> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi
>> ronment.java:91)
>> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>> tionEnvironment.java:896)
>> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>> at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> I'm using Flink v1.1.3. Any help is appreciated. Thank you.
>>
>> Best,
>> Yassine
>>
>
>

Reply via email to