Thanks to Aljoscha and Stefano for pointing out the flaw. We corrected the issue as follows:
[CODE] import org.apache.flink.api.java.tuple. Tuple4 ; import org.apache.flink.util.Collector; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; import org.apache.sling.commons.json.JSONException; ... public static void main(String[] args) throws Exception { if(!parseParameters(args)) { return; } // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input data DataSet<String> text = getTextDataSet(env); DataSet<Tuple4<Integer, String, String, Integer >> counts = // split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event, count ) text.flatMap(new SelectDataFlatMap()) // group by the tuple field "1" (an event - string) and sum up tuple field "3" (integer - value 1) . groupBy(1) . sum(3 ); // emit result if(fileOutput) { counts.writeAsCsv(outputPath, "\n", " "); } else { counts.print(); } // execute program env.execute("Weblogs Programme"); } ... public static class SelectDataFlatMap extends JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> { private static final long serialVersionUID = 1L; @Override public void flatMap(String value, Collector<Tuple4<Integer, String, String, Integer>> record) throws Exception { try { record.collect(new Tuple4<Integer, String, String, Integer>( getInt(value, "timestamp"), getString(value, "uuid"), getString(value, "event"), 1)); } catch (JSONException e) { System.err.println("Field not found"); } } } [/CODE] However, this time the issue was different. The programme executed correctly till status FINISHED. However, there was no output :-(( i.e. For each Task Manager, an empty file is written. When we checked further about the input text file that is read using env.readTextFile() we find that instead of a text string (full text dataset) only a small string is written! Something as : org.apache.flink.api.java.operators.DataSource@6bd8b476 Worse still ! this string value sometimes remains the same over multiple runs of the programme .... Is this natural ? Is this just the handle to the file or the dataset ? Is the Collector() working correctly also ? Note : The actual JSON file (i.e. the text file that should be read) is of the following nature, with a 2-level hierarchy for one field: [JSON] {timestamp: 1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}} {timestamp: 1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}} [/JSON] So now again, we are confused if we are doing it correctly :-(( Thanks in advance for helping us to understand where we are going wrong. Anirvan ----- Original Message ----- > From: "Stefano Bortoli" <s.bort...@gmail.com> > To: "user" <u...@flink.incubator.apache.org> > Cc: dev@flink.incubator.apache.org > Sent: Tuesday, November 25, 2014 5:05:34 PM > Subject: Re: Program crashes trying to read JSON file > Very quickly, it seems you are trying to sum on Strings > Caused by: org.apache.flink.api.java. > aggregation.UnsupportedAggregationTypeException: The type java.lang.String > has currently not supported for built-in sum aggregations. > Check your tuple types and be sure that you are not summing on strings. > 2014-11-25 16:55 GMT+01:00 Anirvan BASU < anirvan.b...@inria.fr > : > > Hello all, > > > We are using Flink 0.7 and trying to read a large JSON file, reading some > > fields into a flink (3-tuple based) dataset, then performing some > > operations. > > > We encountered the following runtime error: > > > [QUOTE] > > > Error: The main method caused an error. > > > org.apache.flink.client.program.ProgramInvocationException: The main method > > caused an error. > > > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) > > > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) > > > at org.apache.flink.client.program.Client.run(Client.java:244) > > > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > > > at > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > > > Caused by: > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: > > The type java.lang.String has currently not supported for built-in sum > > aggregations. > > > at > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) > > > at > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) > > > at > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) > > > at > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) > > > at > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > at java.lang.reflect.Method.invoke(Method.java:606) > > > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) > > > ... 6 more > > > [/QUOTE] > > > The code snippet that could have caused this error (i.e. that we edited) is > > the following > > > [CODE] > > > import org.apache.flink.api.java.tuple.Tuple3; > > > import org.apache.flink.util.Collector; > > > import org.apache.flink.api.java.DataSet; > > > import org.apache.flink.api.java.ExecutionEnvironment; > > > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; > > > import org.apache.sling.commons.json.JSONException; > > > ... > > > public static void main(String[] args) throws Exception { > > > if(!parseParameters(args)) { > > > return; > > > } > > > // set up the execution environment > > > final ExecutionEnvironment env = > > ExecutionEnvironment.getExecutionEnvironment(); > > > // get input data > > > DataSet<String> text = getTextDataSet(env); > > > DataSet<Tuple3<Integer, String, String>> counts = > > > // split up the lines in pairs (3-tuples) containing: > > (timestamp,uuid,event) > > > text.flatMap(new SelectDataFlatMap() ) > > > // group by the tuple field "0" and sum up tuple field "1" > > > .groupBy(2) > > > .sum(2); > > > // emit result > > > if(fileOutput) { > > > counts.writeAsCsv(outputPath, "\n", " "); > > > } else { > > > counts.print(); > > > } > > > // execute program > > > env.execute("Weblogs Programme"); > > > } > > > ... > > > public static class SelectDataFlatMap extends > > > JSONParseFlatMap<String, Tuple3<Integer, String, String>> { > > > @Override > > > public void flatMap(String value, Collector<Tuple3<Integer, String, > > String>> > > out) > > > throws Exception { > > > try { > > > out.collect(new Tuple3<Integer, String, String>( > > > getInt(value, "timestamp"), > > > getString(value, "uuid"), > > > getString(value, "event"))); > > > } catch (JSONException e) { > > > System.err.println("Field not found"); > > > } > > > } > > > } > > > [/CODE] > > > [QUOTE] > > > Error: The main method caused an error. > > > org.apache.flink.client.program.ProgramInvocationException: The main method > > caused an error. > > > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404) > > > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307) > > > at org.apache.flink.client.program.Client.run(Client.java:244) > > > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > > > at > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > > > Caused by: > > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: > > The type java.lang.String has currently not supported for built-in sum > > aggregations. > > > at > > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186) > > > at > > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109) > > > at > > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61) > > > at > > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72) > > > at > > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > at java.lang.reflect.Method.invoke(Method.java:606) > > > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389) > > > ... 6 more > > > [/QUOTE] > > > The JSON file is of the following nature, with a 2-level hierarchy for one > > field: > > > [JSON] > > > {timestamp: 1397731764 payload: {product: Younited uuid: > > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: > > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 > > event: General,Login,Success}} > > > {timestamp: 1397731765 payload: {product: Younited uuid: > > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: > > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 > > event: General,App,Opened}} > > > [/JSON] > > > Thanks in advance for helping us to understand where we are going wrong. > > > Anirvan >