As a follow up – I’m trying to follow the approach I outlined below, and I’m
having trouble figuring out how to perform the step of doing the delete/insert
after the job is complete.
I’ve tried adding a job listener, like so, but that doesn’t seem to ever get
fired off:
val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
INSERT INTO table1_staging
SELECT * FROM table
""")
statementSet.addInsertSql("""
INSERT INTO table2_staging
SELECT * FROM table
""")
statementSet.addInsertSql("""
INSERT INTO table3_staging
SELECT * FROM table3
""")
streamEnv.registerJobListener(new JobListener() {
override def onJobSubmitted(jobClient: JobClient, throwable: Throwable):
Unit = {}
override def onJobExecuted(result: JobExecutionResult, throwable:
Throwable): Unit = {
val time = Option(result).map(_.getNetRuntime())
if (throwable == null) {
Log.info(s"Completed job successfully in $time milliseconds")
} else {
Log.error(s"Unable to execute job successfully", throwable)
}
}
})
statementSet.execute()
I tried the above with the execute before and after the register, but it
doesn’t seem to fire in any case.
I also tried this:
Try(statementSet.execute().getJobClient().get().getJobStatus().join())
.map { _ =>
Log.info(s"Completed job successfully")
}
.recover {
case t => {
Log.error(s"Unable to execute job successfully", t)
}
}
And this seems to have fired WAY before the job actually finished flowing all
the data through. I tried both join and get on the job status CompleteableFuture
Is there anything I’m missing as far as being able to tell when the job is
complete? Again, this is Flink 1.11.2 that I’m running.
Thanks,
Dylan Forciea
From: Dylan Forciea <[email protected]>
Date: Monday, December 7, 2020 at 8:04 AM
To: "[email protected]" <[email protected]>
Subject: Batch loading into postgres database
I am setting up a Flink job that will reload a table in a postgres database
using the Flink SQL functionality. I just wanted to make sure that given the
current feature set I am going about this the correct way. I am currently using
version 1.11.2, but plan on upgrading to 1.12 soon whenever it is finalized.
I have setup a staging table and a final table in a postgres database. My plan
is to have a Flink application that will truncate the contents of the staging
table before the job begins using JDBC, run the job to completion, and then
with JDBC delete/insert into the final table from the staging table in a
transaction after the job completes.
Is this the expected way to interact with postgres in a batch job like this? Or
is there some functionality or method that I am missing?
Regards,
Dylan Forciae