Thanks to Aljoscha and Stefano for pointing out the flaw.
We corrected the issue as follows:
[CODE]
import org.apache.flink.api.java.tuple. Tuple4 ;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.sling.commons.json.JSONException;
...
public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple4<Integer, String, String, Integer >> counts =
// split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event,
count )
text.flatMap(new SelectDataFlatMap())
// group by the tuple field "1" (an event - string) and sum up tuple field "3"
(integer - value 1)
. groupBy(1)
. sum(3 );
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
} else {
counts.print();
}
// execute program
env.execute("Weblogs Programme");
}
...
public static class SelectDataFlatMap extends
JSONParseFlatMap<String, Tuple4 <Integer, String, String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple4<Integer, String, String,
Integer>> record)
throws Exception {
try {
record.collect(new Tuple4<Integer, String, String, Integer>(
getInt(value, "timestamp"),
getString(value, "uuid"),
getString(value, "event"),
1));
} catch (JSONException e) {
System.err.println("Field not found");
}
}
}
[/CODE]
However, this time the issue was different.
The programme executed correctly till status FINISHED.
However, there was no output :-((
i.e. For each Task Manager, an empty file is written.
When we checked further about the input text file that is read using
env.readTextFile() we find that instead of a text string (full text dataset)
only a small string is written!
Something as :
org.apache.flink.api.java.operators.DataSource@6bd8b476
Worse still ! this string value sometimes remains the same over multiple runs
of the programme ....
Is this natural ? Is this just the handle to the file or the dataset ?
Is the Collector() working correctly also ?
Note :
The actual JSON file (i.e. the text file that should be read) is of the
following nature, with a 2-level hierarchy for one field:
[JSON]
{timestamp: 1397731764 payload: {product: Younited uuid:
754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform:
native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1
event: General,Login,Success}}
{timestamp: 1397731765 payload: {product: Younited uuid:
e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform:
native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1
event: General,App,Opened}}
[/JSON]
So now again, we are confused if we are doing it correctly :-((
Thanks in advance for helping us to understand where we are going wrong.
Anirvan
----- Original Message -----
> From: "Stefano Bortoli" <[email protected]>
> To: "user" <[email protected]>
> Cc: [email protected]
> Sent: Tuesday, November 25, 2014 5:05:34 PM
> Subject: Re: Program crashes trying to read JSON file
> Very quickly, it seems you are trying to sum on Strings
> Caused by: org.apache.flink.api.java.
> aggregation.UnsupportedAggregationTypeException: The type java.lang.String
> has currently not supported for built-in sum aggregations.
> Check your tuple types and be sure that you are not summing on strings.
> 2014-11-25 16:55 GMT+01:00 Anirvan BASU < [email protected] > :
> > Hello all,
>
> > We are using Flink 0.7 and trying to read a large JSON file, reading some
> > fields into a flink (3-tuple based) dataset, then performing some
> > operations.
>
> > We encountered the following runtime error:
>
> > [QUOTE]
>
> > Error: The main method caused an error.
>
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
>
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>
> > at org.apache.flink.client.program.Client.run(Client.java:244)
>
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
>
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> > at java.lang.reflect.Method.invoke(Method.java:606)
>
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>
> > ... 6 more
>
> > [/QUOTE]
>
> > The code snippet that could have caused this error (i.e. that we edited) is
> > the following
>
> > [CODE]
>
> > import org.apache.flink.api.java.tuple.Tuple3;
>
> > import org.apache.flink.util.Collector;
>
> > import org.apache.flink.api.java.DataSet;
>
> > import org.apache.flink.api.java.ExecutionEnvironment;
>
> > import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
>
> > import org.apache.sling.commons.json.JSONException;
>
> > ...
>
> > public static void main(String[] args) throws Exception {
>
> > if(!parseParameters(args)) {
>
> > return;
>
> > }
>
> > // set up the execution environment
>
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
>
> > // get input data
>
> > DataSet<String> text = getTextDataSet(env);
>
> > DataSet<Tuple3<Integer, String, String>> counts =
>
> > // split up the lines in pairs (3-tuples) containing:
> > (timestamp,uuid,event)
>
> > text.flatMap(new SelectDataFlatMap() )
>
> > // group by the tuple field "0" and sum up tuple field "1"
>
> > .groupBy(2)
>
> > .sum(2);
>
> > // emit result
>
> > if(fileOutput) {
>
> > counts.writeAsCsv(outputPath, "\n", " ");
>
> > } else {
>
> > counts.print();
>
> > }
>
> > // execute program
>
> > env.execute("Weblogs Programme");
>
> > }
>
> > ...
>
> > public static class SelectDataFlatMap extends
>
> > JSONParseFlatMap<String, Tuple3<Integer, String, String>> {
>
> > @Override
>
> > public void flatMap(String value, Collector<Tuple3<Integer, String,
> > String>>
> > out)
>
> > throws Exception {
>
> > try {
>
> > out.collect(new Tuple3<Integer, String, String>(
>
> > getInt(value, "timestamp"),
>
> > getString(value, "uuid"),
>
> > getString(value, "event")));
>
> > } catch (JSONException e) {
>
> > System.err.println("Field not found");
>
> > }
>
> > }
>
> > }
>
> > [/CODE]
>
> > [QUOTE]
>
> > Error: The main method caused an error.
>
> > org.apache.flink.client.program.ProgramInvocationException: The main method
> > caused an error.
>
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
>
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
>
> > at org.apache.flink.client.program.Client.run(Client.java:244)
>
> > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
>
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
>
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
>
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
>
> > Caused by:
> > org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException:
> > The type java.lang.String has currently not supported for built-in sum
> > aggregations.
>
> > at
> > org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
>
> > at
> > org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
>
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
>
> > at
> > org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
>
> > at
> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
>
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> > at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> > at java.lang.reflect.Method.invoke(Method.java:606)
>
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
>
> > ... 6 more
>
> > [/QUOTE]
>
> > The JSON file is of the following nature, with a 2-level hierarchy for one
> > field:
>
> > [JSON]
>
> > {timestamp: 1397731764 payload: {product: Younited uuid:
> > 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform:
> > native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1
> > event: General,Login,Success}}
>
> > {timestamp: 1397731765 payload: {product: Younited uuid:
> > e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform:
> > native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1
> > event: General,App,Opened}}
>
> > [/JSON]
>
> > Thanks in advance for helping us to understand where we are going wrong.
>
> > Anirvan
>