Very quickly, it seems you are trying to sum on Strings Caused by: org.apache.flink.api.java. aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
Check your tuple types and be sure that you are not summing on strings. 2014-11-25 16:55 GMT+01:00 Anirvan BASU <anirvan.b...@inria.fr>: > Hello all, > > We are using Flink 0.7 and trying to read a large JSON file, reading some > fields into a flink (3-tuple based) dataset, then performing some > operations. > > We encountered the following runtime error: > > [QUOTE] > Error: The main method caused an error. > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) > at org.apache.flink.client.program.Client.run(Client.java:244) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > Caused by: > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: > The type java.lang.String has currently not supported for built-in sum > aggregations. > at > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) > at > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) > at > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) > at > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) > ... 6 more > [/QUOTE] > > > > The code snippet that could have caused this error (i.e. that we edited) > is the following > > [CODE] > > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.util.Collector; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; > import org.apache.sling.commons.json.JSONException; > ... > > public static void main(String[] args) throws Exception { > > if(!parseParameters(args)) { > return; > } > > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > // get input data > DataSet<String> text = getTextDataSet(env); > > DataSet<Tuple3<Integer, String, String>> counts = > // split up the lines in pairs (3-tuples) containing: > (timestamp,uuid,event) > text.flatMap(new *SelectDataFlatMap()*) > // group by the tuple field "0" and sum up tuple field "1" > .groupBy(2) > .sum(2); > > // emit result > if(fileOutput) { > counts.writeAsCsv(outputPath, "\n", " "); > } else { > counts.print(); > } > > // execute program > env.execute("Weblogs Programme"); > } > > ... > > public static class *SelectDataFlatMap* extends > JSONParseFlatMap<String, Tuple3<Integer, String, String>> { > > @Override > public void flatMap(String value, Collector<Tuple3<Integer, > String, String>> out) > throws Exception { > try { > out.collect(new Tuple3<Integer, String, String>( > getInt(value, "timestamp"), > getString(value, "uuid"), > getString(value, "event"))); > } catch (JSONException e) { > System.err.println("Field not found"); > } > } > } > > [/CODE] > > > > [QUOTE] > Error: The main method caused an error. > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) > at org.apache.flink.client.program.Client.run(Client.java:244) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > Caused by: > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: > The type java.lang.String has currently not supported for built-in sum > aggregations. > at > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) > at > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) > at > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) > at > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) > ... 6 more > [/QUOTE] > > > The JSON file is of the following nature, with a 2-level hierarchy for one > field: > [JSON] > {timestamp: 1397731764 <callto:1397731764> payload: {product: > Younited uuid: > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd > platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 > type: can-usage-v1 event: General,Login,Success}} > {timestamp: 1397731765 <callto:1397731765> payload: {product: > Younited uuid: > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e > platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e > type: can-usage-v1 event: General,App,Opened}} > [/JSON] > > > > Thanks in advance for helping us to understand where we are going wrong. > > Anirvan >