Hi Flink Community,

I need some help with JDBC sink in Datastream API. I can produce some records 
and sink it to database correctly. However, if I wait for 5 minutes between 
insertions. I will run into broken pipeline issue. Ater that, the Flink 
application will restart and recover from checkpoint and execute the failed SQL 
query. I tried hard to search for resources to understand such broken pipeline 
will happen, but I still can’t understand it.

The interesting thing is that, if the idle time is around 3 minutes, everything 
seems to be fine.

It seems to be a timeout related issue, but I just don’t know what should I do 
to fix the issue. I have shared the sink code. Could anyone share some ideas? 
Thank you so much!
My environment settings:
Flink version: 1.12.1
Scala version: 2.11
Java version: 1.11
Flink System parallelism: 1
JDBC Driver: Oracle ojdbc10
Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can 
regard this as an cloud based Oracle Database)

The code for the sink:
        boDataStream
        .addSink(
            JdbcSink.sink(
                "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",
                (preparedStatement, testInvoiceBo) -> {
                  try {
                      Gson gson = new GsonBuilder()
                              .excludeFieldsWithoutExposeAnnotation()
                              .create();
                      String invoiceId = testInvoiceBo.getINVOICE_ID();
                      String json = gson.toJson(testInvoiceBo);
                      log.info("insertion information: {}", json);
                      preparedStatement.setString(1, invoiceId);
                      preparedStatement.setString(2, json);
                  } catch (JsonIOException e) {
                      log.error("Failed to parse JSON", e);
                  }
                },
                new JdbcExecutionOptions.Builder()
                .withBatchIntervalMs(0)
                .withBatchSize(1)
                .withMaxRetries(3)
                .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl(DB_URL)
                    .withDriverName("oracle.jdbc.driver.OracleDriver")
                    .withUsername("admin")
                    .withPassword("password")
                    .build()))
        .name("adwSink")
        .uid("adwSink")
        .setParallelism(1);

The JDBC broken pipeline log:


Reply via email to