Hi all, I have implemented RichInputFormat for reading result of aggregation queries in Elasticsearch. There are around 100000 buckets, which are of type json array. Note: This is one time response.
My idea here is to iterate these arrays in parallel. Here is the pseudo code. public void configure(Configuration parameters) { System.out.println("configure"); } public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { } public ResponseInputSplit[] createInputSplits(int minNumSplits){ System.out.println("createInputSplits"); //read from elastic // add buckets to array } public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] inputSplits) { //this is default System.out.println("getInputSplitAssigner"); return new DefaultInputSplitAssigner(inputSplits); } public void open(ResponseInputSplit split) { //read buckets } public boolean reachedEnd(){ System.out.println("reachedEnd"); } public Bounce nextRecord(Bounce reuse) { } public void close(){ } // my main method, ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Bounce> bounce_data_set = env.createInput(new MyInputDataSetInputFormat()); When running in eclipse, it executes createInputSplits and the results look fine. Logs are given below. Output is --> configure Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id... configure createInputSplits When submitting job in flink cluster, it doesn't execute 'configure' and 'createInputSplits' methods. Instead it directly goes to nextRecord function. Logs are given below. Output is --> Starting execution of program configure Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx /user/jobmanager#1219973491] with leader session id... 10/26/2018 15:05:57 Job execution switched to status RUNNING. 10/26/2018 15:05:57 DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED 10/26/2018 15:05:57 DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING 10/26/2018 15:06:00 DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING 10/26/2018 15:06:00 DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED java.lang.NullPointerException at com.xxx.test. MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143) Regards, Teena