Aleksandr Iushmanov created ZEPPELIN-5909:
---------------------------------------------
Summary: Flink115SqlInterpreter throws IOException on successful
insert operation
Key: ZEPPELIN-5909
URL: https://issues.apache.org/jira/browse/ZEPPELIN-5909
Project: Zeppelin
Issue Type: Bug
Components: zeppelin-interpreter
Affects Versions: 0.11.0
Reporter: Aleksandr Iushmanov
There is an existing race condition in Flink-1.15 and above that breaks code
below:
TableResult tableResult = ((TableEnvironmentInternal)
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try \{
tableResult.await();
JobClient jobClient = tableResult.getJobClient().get();
if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
} else \{
throw new IOException("Job is failed, " +
jobClient.getJobExecutionResult().get().toString());
}
} catch (InterruptedException e) \{
throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) \{
throw new IOException("Flink job is failed", e);
}
jobClient.getJobStatus() can return status RUNNIG for some short period of time
when job result already available which leads to IOException thrown from
zeppelin interpreter:
Job is failed, Program execution finished
Example of working patch:
{code:java}
private void waitForJobStatusFinishedOrThrow(
final JobClient jobClient,
final long timeoutMillis,
final long retryIntervalMillis)
throws IOException, InterruptedException, ExecutionException {
final long startTime = System.currentTimeMillis();
JobStatus status = jobClient.getJobStatus().get();
while (status.equals(JobStatus.RUNNING) && System.currentTimeMillis() -
startTime < timeoutMillis) {
Thread.sleep(retryIntervalMillis);
status = jobClient.getJobStatus().get();
}
if (!status.equals(JobStatus.FINISHED)) {
throw new IOException("Job reached terminal state with result: "
+ jobClient.getJobExecutionResult().get().toString()
+ " but job status is not FINISHED after timeout: " +
timeoutMillis);
}
} {code}
{code:java}
tableResult.await();
// Check https://i.amazon.com/issues/MERRIMAC-32509 for more details on bug
investigation and linked upstream
// Jira tickets for Flink 1.16 onwards and Zeppelin support for Flink 1.15 SQL
interpreter
waitForJobStatusFinishedOrThrow(
tableResult.getJobClient().get(),
TimeUnit.SECONDS.toMillis(30),
TimeUnit.SECONDS.toMillis(1)
);
context.out.write("Insertion successfully.\n"); {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)