Hi all,

I have implemented RichInputFormat for reading result of aggregation queries in 
Elasticsearch. There are around 100000 buckets, which are of type json array. 
Note: This is one time response.

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

public void configure(Configuration parameters) {
System.out.println("configure");
}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
}

public ResponseInputSplit[] createInputSplits(int minNumSplits){
System.out.println("createInputSplits");

//read from elastic
// add buckets to array
}

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] 
inputSplits) {
//this is default
System.out.println("getInputSplitAssigner");
return new DefaultInputSplitAssigner(inputSplits);
}

public void open(ResponseInputSplit split) {
//read buckets
}

public boolean reachedEnd(){
System.out.println("reachedEnd");
}

public Bounce nextRecord(Bounce reuse) {
}

public void close(){
}

// my main method,
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Bounce> bounce_data_set = env.createInput(new 
MyInputDataSetInputFormat());

When running in eclipse, it executes createInputSplits and the results look 
fine. Logs are given below.
Output is -->
configure
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] 
with leader session id...
configure
createInputSplits

When submitting job in flink cluster, it doesn't execute 'configure' and 
'createInputSplits' methods. Instead it directly goes to nextRecord function. 
Logs are given below.
Output is -->
Starting execution of program
configure
Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job 
completion.
Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx 
/user/jobmanager#1219973491] with leader session id...
10/26/2018 15:05:57     Job execution switched to status RUNNING.
10/26/2018 15:05:57     DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
10/26/2018 15:05:57     DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
10/26/2018 15:06:00     DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
10/26/2018 15:06:00     DataSource (at 
createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
java.lang.NullPointerException
                               at com.xxx.test. 
MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

Regards,
Teena

Reply via email to