Yes, makes sense to att the HBase fix to that. On Tue, Nov 25, 2014 at 10:55 AM, Flavio Pompermaier <[email protected]> wrote:
> Yes because Stefano is working on the stable version..I saw that you are > going to release the 7.1 version, do you think you can include also the new > HBase addon (that generates Tuples..)? > > On Tue, Nov 25, 2014 at 10:31 AM, Stefano Bortoli <[email protected]> > wrote: > >> Yes, I am using the record data type. I can move the implementation to >> the Tuple if that is what is needed. >> >> Thanks for the tip! :-) >> >> saluti, >> Stefano >> >> 2014-11-25 10:29 GMT+01:00 Stephan Ewen <[email protected]>: >> >>> I just had a look at this. >>> >>> Are you using the "Record" data type? That one's tools seem not to >>> support this right now, but it is an easy fix... >>> Am 25.11.2014 09:48 schrieb "Stefano Bortoli" <[email protected]>: >>> >>> Hi, >>>> >>>> I am trying to run this code: >>>> >>>> public static void main(String[] args) throws Exception { >>>> ExecutionEnvironment env = ExecutionEnvironment >>>> .getExecutionEnvironment(); >>>> >>>> MyTableInputFormat inputFormat = new MyTableInputFormat(); >>>> >>>> DataSource<Record> dataset = env.createInput(inputFormat); >>>> >>>> DataSet<Tuple4<StringValue, StringValue, StringValue, >>>> BooleanValue>> candidates = dataset >>>> .filter(new EmptyEntityFilterFunction()).rebalance() >>>> .flatMap(new FindCandidateWithMatchFlagMapFunction<>()); >>>> >>>> DataSet<Tuple3<StringValue, StringValue, StringValue>> >>>> duplicates = candidates >>>> .filter(new >>>> SingleMatchFilterFunctionWithFlagMatch<>()).map( >>>> new MapToTuple3MapFunction<>()); >>>> >>>> DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint = >>>> duplicates >>>> .distinct(0, 1) >>>> .groupBy(0) >>>> .reduceGroup( >>>> new >>>> ConsolidateByTypeDuplicatesGroupReduceFunction()); >>>> >>>> duplicatesToprint.writeAsText("file:///tmp/" >>>> + EnsMaintenanceConstants.WORKING_TABLE + "/", >>>> WriteMode.OVERWRITE); >>>> >>>> env.execute(); >>>> } >>>> >>>> but it fails right away with this exception. In the API it is written >>>> that rebalance can be used as input of map functions. It is not clear to me >>>> what I am doing wrong, unless rebalancing is actually illegal. In this >>>> case, it should not be available as API I guess :-) >>>> >>>> please let me know how I could use rebalance. >>>> >>>> Error: java.lang.Exception: Failed to deploy the task CHAIN DataSource >>>> (org.okkam.flink.hbase.MyTableInputFormat@4259448a) -> Filter >>>> (org.okkam.flink.hbase.EmptyEntityFilterFunction) (1/24) - execution #0 to >>>> slot SubSlot 0 (7536af39024636d81e3e05782a701bde (3) - ALLOCATED/ALIVE): >>>> java.lang.RuntimeException: The initialization of the DataSource's outputs >>>> caused an error: Invalid shipping strategy for OutputEmitter: >>>> PARTITION_FORCED_REBALANCE >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:92) >>>> at >>>> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:180) >>>> at >>>> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:594) >>>> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) >>>> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947) >>>> Caused by: java.lang.IllegalArgumentException: Invalid shipping >>>> strategy for OutputEmitter: PARTITION_FORCED_REBALANCE >>>> at >>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:99) >>>> at >>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:69) >>>> at >>>> org.apache.flink.runtime.operators.shipping.RecordOutputEmitter.<init>(RecordOutputEmitter.java:58) >>>> at >>>> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1245) >>>> at >>>> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1338) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:327) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:90) >>>> ... 7 more >>>> >>> >> >
