You can output your results in different ways. If all you need is to write
a file, I normally use the writeAsText method (however, there is the
writeAsCSV, writeAsFormattedText. Of write according to your custom
FileOutputFormat.
datasetToPrint.writeAsText("/path/to/file/with/permission",
WriteMode.OVERWRITE);
Keep in mind that this will output your tuple dataset. Therefore, if you
want to shape your output differently, It may be necessary to have further
processing.
saluti,
Stefano
2014-11-25 22:04 GMT+01:00 Anirvan BASU <[email protected]>:
> Thanks to Aljoscha and Stefano for pointing out the flaw.
>
> We corrected the issue as follows:
>
> [CODE]
>
> import org.apache.flink.api.java.tuple.*Tuple4*;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
> import org.apache.sling.commons.json.JSONException;
> ...
>
> public static void main(String[] args) throws Exception {
>
> if(!parseParameters(args)) {
> return;
> }
>
> // set up the execution environment
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> // get input data
> DataSet<String> text = getTextDataSet(env);
>
> DataSet<Tuple4<Integer, String, String, *Integer*>> counts =
> // split up the lines in pairs (4-tuples) containing:
> (timestamp,uuid,event,*count*)
> text.flatMap(new SelectDataFlatMap())
> // group by the tuple field "1" (an event - string) and
> sum up tuple field "3" (integer - value 1)
> .*groupBy(1)*
> .*sum(3*);
>
>
> // emit result
> if(fileOutput) {
> counts.writeAsCsv(outputPath, "\n", " ");
> } else {
> counts.print();
> }
>
> // execute program
> env.execute("Weblogs Programme");
> }
>
> ...
>
> public static class SelectDataFlatMap extends
> JSONParseFlatMap<String, *Tuple4*<Integer, String, String, Integer>> {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public void flatMap(String value, Collector<Tuple4<Integer,
> String, String, Integer>> record)
> throws Exception {
> try {
> record.collect(new Tuple4<Integer, String, String,
> Integer>(
> getInt(value, "timestamp"),
> getString(value, "uuid"),
> getString(value, "event"),
> 1));
> } catch (JSONException e) {
> System.err.println("Field not found");
> }
> }
> }
>
>
> [/CODE]
>
> However, this time the issue was different.
> The programme executed correctly till status FINISHED.
> However, there was no output :-((
> i.e. For each Task Manager, an empty file is written.
>
> When we checked further about the input text file that is read using
> env.readTextFile() we find that instead of a text string (full text
> dataset) only a small string is written!
> Something as :
> org.apache.flink.api.java.operators.DataSource@6bd8b476
>
> Worse still ! this string value sometimes remains the same over multiple
> runs of the programme ....
> Is this natural ? Is this just the handle to the file or the dataset ?
> Is the Collector() working correctly also ?
>
>
> Note :
> The actual JSON file (i.e. the text file that should be read) is of the
> following nature, with a 2-level hierarchy for one field:
> [JSON]
> {timestamp: 1397731764 <callto:1397731764> payload: {product:
> Younited uuid:
> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
> platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
> type: can-usage-v1 event: General,Login,Success}}
> {timestamp: 1397731765 <callto:1397731765> payload: {product:
> Younited uuid:
> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
> platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e
> type: can-usage-v1 event: General,App,Opened}}
> [/JSON]
>
>
> So now again, we are confused if we are doing it correctly :-((
>
> Thanks in advance for helping us to understand where we are going wrong.
> Anirvan
>
> ------------------------------
>
> *From: *"Stefano Bortoli" <[email protected]>
> *To: *"user" <[email protected]>
> *Cc: *[email protected]
> *Sent: *Tuesday, November 25, 2014 5:05:34 PM
> *Subject: *Re: Program crashes trying to read JSON file
>
>
> Very quickly, it seems you are trying to sum on Strings
>
> Caused by: org.apache.flink.api.java.
> aggregation.UnsupportedAggregationTypeException: The type java.lang.String
> has currently not supported for built-in sum aggregations.
>
> Check your tuple types and be sure that you are not summing on strings.
>
>
> 2014-11-25 16:55 GMT+01:00 Anirvan BASU <[email protected]>:
>
>> Hello all,
>>
>> We are using Flink 0.7 and trying to read a large JSON file, reading some
>> fields into a flink (3-tuple based) dataset, then performing some
>> operations.
>>
>> We encountered the following runtime error:
>>
>> [QUOTE]
>> Error: The main method caused an error.
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>> at org.apache.flink.client.program.Client.run(Client.java:244)
>> at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by:
>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>> The type java.lang.String has currently not supported for built-in sum
>> aggregations.
>> at
>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>> at
>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>> at
>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>> at
>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>> at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>> ... 6 more
>> [/QUOTE]
>>
>>
>>
>> The code snippet that could have caused this error (i.e. that we edited)
>> is the following
>>
>> [CODE]
>>
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>> import org.apache.sling.commons.json.JSONException;
>> ...
>>
>> public static void main(String[] args) throws Exception {
>>
>> if(!parseParameters(args)) {
>> return;
>> }
>>
>> // set up the execution environment
>> final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>> // get input data
>> DataSet<String> text = getTextDataSet(env);
>>
>> DataSet<Tuple3<Integer, String, String>> counts =
>> // split up the lines in pairs (3-tuples) containing:
>> (timestamp,uuid,event)
>> text.flatMap(new *SelectDataFlatMap()*)
>> // group by the tuple field "0" and sum up tuple field "1"
>> .groupBy(2)
>> .sum(2);
>>
>> // emit result
>> if(fileOutput) {
>> counts.writeAsCsv(outputPath, "\n", " ");
>> } else {
>> counts.print();
>> }
>>
>> // execute program
>> env.execute("Weblogs Programme");
>> }
>>
>> ...
>>
>> public static class *SelectDataFlatMap* extends
>> JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>>
>> @Override
>> public void flatMap(String value, Collector<Tuple3<Integer,
>> String, String>> out)
>> throws Exception {
>> try {
>> out.collect(new Tuple3<Integer, String, String>(
>> getInt(value, "timestamp"),
>> getString(value, "uuid"),
>> getString(value, "event")));
>> } catch (JSONException e) {
>> System.err.println("Field not found");
>> }
>> }
>> }
>>
>> [/CODE]
>>
>>
>>
>> [QUOTE]
>> Error: The main method caused an error.
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>> at org.apache.flink.client.program.Client.run(Client.java:244)
>> at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>> Caused by:
>> org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
>> The type java.lang.String has currently not supported for built-in sum
>> aggregations.
>> at
>> org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>> at
>> org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>> at
>> org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>> at
>> org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>> at
>> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>> ... 6 more
>> [/QUOTE]
>>
>>
>> The JSON file is of the following nature, with a 2-level hierarchy for
>> one field:
>> [JSON]
>> {timestamp: 1397731764 <callto:1397731764> payload: {product:
>> Younited uuid:
>> 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd
>> platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241
>> type: can-usage-v1 event: General,Login,Success}}
>> {timestamp: 1397731765 <callto:1397731765> payload: {product:
>> Younited uuid:
>> e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e
>> platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e
>> type: can-usage-v1 event: General,App,Opened}}
>> [/JSON]
>>
>>
>>
>> Thanks in advance for helping us to understand where we are going wrong.
>>
>> Anirvan
>>
>
>
>