I just had a look at this. Are you using the "Record" data type? That one's tools seem not to support this right now, but it is an easy fix... Am 25.11.2014 09:48 schrieb "Stefano Bortoli" <s.bort...@gmail.com>:
> Hi, > > I am trying to run this code: > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment > .getExecutionEnvironment(); > > MyTableInputFormat inputFormat = new MyTableInputFormat(); > > DataSource<Record> dataset = env.createInput(inputFormat); > > DataSet<Tuple4<StringValue, StringValue, StringValue, > BooleanValue>> candidates = dataset > .filter(new EmptyEntityFilterFunction()).rebalance() > .flatMap(new FindCandidateWithMatchFlagMapFunction<>()); > > DataSet<Tuple3<StringValue, StringValue, StringValue>> duplicates > = candidates > .filter(new > SingleMatchFilterFunctionWithFlagMatch<>()).map( > new MapToTuple3MapFunction<>()); > > DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint = > duplicates > .distinct(0, 1) > .groupBy(0) > .reduceGroup( > new > ConsolidateByTypeDuplicatesGroupReduceFunction()); > > duplicatesToprint.writeAsText("file:///tmp/" > + EnsMaintenanceConstants.WORKING_TABLE + "/", > WriteMode.OVERWRITE); > > env.execute(); > } > > but it fails right away with this exception. In the API it is written that > rebalance can be used as input of map functions. It is not clear to me what > I am doing wrong, unless rebalancing is actually illegal. In this case, it > should not be available as API I guess :-) > > please let me know how I could use rebalance. > > Error: java.lang.Exception: Failed to deploy the task CHAIN DataSource > (org.okkam.flink.hbase.MyTableInputFormat@4259448a) -> Filter > (org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) - execution #0 to > slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) - ALLOCATED/ALIVE): > java.lang.RuntimeException: The initialization of the DataSource's outputs > caused an error: Invalid shipping strategy for OutputEmitter: > PARTITION_FORCED_REBALANCE > at > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) > at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) > Caused by: java.lang.IllegalArgumentException: Invalid shipping strategy > for OutputEmitter: PARTITION_FORCED_REBALANCE > at > org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99) > at > org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69) > at > org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58) > at > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245) > at > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338) > at > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327) > at > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90) > ... 7 more >