curcur commented on a change in pull request #15636: URL: https://github.com/apache/flink/pull/15636#discussion_r615913550
########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java ########## @@ -115,10 +116,6 @@ public void close() throws SQLException { connection.close(); connection = null; } - if (xaConnection != null) { - xaConnection.close(); - xaConnection = null; - } Review comment: why this is removed and not needed to be closed? ########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java ########## @@ -102,19 +212,108 @@ public void cancel() { @Override public void notifyCheckpointComplete(long checkpointId) { - if (checkpointId == this.checkpointAfterData) { - dataCheckpointed = true; + if (checkpointId == this.lastCheckpointId) { + lastSnapshotConfirmed = true; + } + } Review comment: would this cause test in-stablity? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java ########## @@ -178,7 +175,10 @@ public void failOrRollback(Xid xid) { Command.fromRunnable( "end (fail)", xid, - () -> xaResource.end(xid, XAResource.TMFAIL), + () -> { + xaResource.end(xid, XAResource.TMFAIL); + xaResource.rollback(xid); Review comment: This looks like a `FailAndRollback`? ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -234,13 +231,19 @@ public void open(Configuration configuration) throws Exception { xaGroupOps.recoverAndRollback(); } beginTx(0L); + outputFormat.setRuntimeContext(getRuntimeContext()); + // open format only after starting the transaction so it gets a ready to use connection + outputFormat.open( + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks()); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { LOG.debug("snapshot state, checkpointId={}", context.getCheckpointId()); prepareCurrentTx(context.getCheckpointId()); beginTx(context.getCheckpointId() + 1); + outputFormat.reconnect(false); // associate with potentially new connection Review comment: This is to let the output connects to a new connection after a new transaction begins, right? Maybe name it something similar and makes the comment more understandable? It is difficult to infer what does "potentially" potentially mean here... ########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java ########## @@ -31,68 +37,172 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.ExceptionUtils; +import org.junit.Rule; import org.junit.Test; +import org.postgresql.xa.PGXADataSource; +import org.testcontainers.containers.PostgreSQLContainer; -import java.io.Serializable; +import javax.sql.XADataSource; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB; +import static java.util.Collections.singletonList; +import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; import static org.apache.flink.connector.jdbc.JdbcTestFixture.INPUT_TABLE; import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE; -import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; +import static org.apache.flink.connector.jdbc.xa.JdbcXaFacadeTestHelper.getInsertedIds; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; /** A simple end-to-end test for {@link JdbcXaSinkFunction}. */ -public class JdbcExactlyOnceSinkE2eTest extends JdbcXaSinkTestBase { +public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase { + + private static final class PgXaDb extends PostgreSQLContainer<PgXaDb> { + public PgXaDb(String dockerImageName) { + super(dockerImageName); + // set max_prepared_transactions to non-zero + this.setCommand("postgres", "-c", "max_prepared_transactions=50", "-c", "fsync=off"); + } + } + + @Rule public PgXaDb db = new PgXaDb("postgres:9.6.12"); + + @Override + public void after() throws Exception { + // no need for cleanup - done by test container tear down + } @Test public void testInsert() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - env.setRestartStrategy(new NoRestartStrategyConfiguration()); + int parallelism = 4, + elementsPerSource = 500, + numElementsPerCheckpoint = 7, + minElementsPerFailure = numElementsPerCheckpoint / 3, + maxElementsPerFailure = numElementsPerCheckpoint * 3; Review comment: They are cool, but maybe write them in the normal way....? int xxx = ... int yyy = ... ########## File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java ########## @@ -224,11 +224,11 @@ public void open(Configuration configuration) throws Exception { hangingXids = new LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry()); commitUpToCheckpoint(Optional.empty()); if (options.isDiscoverAndRollbackOnRecovery()) { - // todo: consider doing recover-rollback later (e.g. after the 1st checkpoint) - // when we are sure that all other subtasks started and committed any of their prepared - // transactions - // this would require to distinguish between this job Xids and other Xids - xaGroupOps.recoverAndRollback(); + // Pending transactions which are not included into the checkpoint might hold locks and + // should be rolled back. However, rolling back ALL transactions can cause data loss. So + // each subtask first commits transactions from its state and then rolls back discovered + // transactions if they belong to it. + xaGroupOps.recoverAndRollback(getRuntimeContext(), xidGenerator); Review comment: Do not understand this part, sync up off line tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org