curcur commented on a change in pull request #15636:
URL: https://github.com/apache/flink/pull/15636#discussion_r615913550



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
##########
@@ -115,10 +116,6 @@ public void close() throws SQLException {
             connection.close();
             connection = null;
         }
-        if (xaConnection != null) {
-            xaConnection.close();
-            xaConnection = null;
-        }

Review comment:
       why this is removed and not needed to be closed?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
##########
@@ -102,19 +212,108 @@ public void cancel() {
 
         @Override
         public void notifyCheckpointComplete(long checkpointId) {
-            if (checkpointId == this.checkpointAfterData) {
-                dataCheckpointed = true;
+            if (checkpointId == this.lastCheckpointId) {
+                lastSnapshotConfirmed = true;
+            }
+        }

Review comment:
       would this cause test in-stablity?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
##########
@@ -178,7 +175,10 @@ public void failOrRollback(Xid xid) {
                 Command.fromRunnable(
                         "end (fail)",
                         xid,
-                        () -> xaResource.end(xid, XAResource.TMFAIL),
+                        () -> {
+                            xaResource.end(xid, XAResource.TMFAIL);
+                            xaResource.rollback(xid);

Review comment:
       This looks like a `FailAndRollback`? 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -234,13 +231,19 @@ public void open(Configuration configuration) throws 
Exception {
             xaGroupOps.recoverAndRollback();
         }
         beginTx(0L);
+        outputFormat.setRuntimeContext(getRuntimeContext());
+        // open format only after starting the transaction so it gets a ready 
to  use connection
+        outputFormat.open(
+                getRuntimeContext().getIndexOfThisSubtask(),
+                getRuntimeContext().getNumberOfParallelSubtasks());
     }
 
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
         LOG.debug("snapshot state, checkpointId={}", 
context.getCheckpointId());
         prepareCurrentTx(context.getCheckpointId());
         beginTx(context.getCheckpointId() + 1);
+        outputFormat.reconnect(false); // associate with potentially new 
connection

Review comment:
       This is to let the output connects to a new connection after a new 
transaction begins, right? Maybe name it something similar and makes the 
comment more understandable?
   
   It is difficult to infer what does "potentially" potentially mean here...

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
##########
@@ -31,68 +37,172 @@
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.ExceptionUtils;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.postgresql.xa.PGXADataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
 
-import java.io.Serializable;
+import javax.sql.XADataSource;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.INPUT_TABLE;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
-import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static 
org.apache.flink.connector.jdbc.xa.JdbcXaFacadeTestHelper.getInsertedIds;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
 
 /** A simple end-to-end test for {@link JdbcXaSinkFunction}. */
-public class JdbcExactlyOnceSinkE2eTest extends JdbcXaSinkTestBase {
+public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
+
+    private static final class PgXaDb extends PostgreSQLContainer<PgXaDb> {
+        public PgXaDb(String dockerImageName) {
+            super(dockerImageName);
+            // set max_prepared_transactions to non-zero
+            this.setCommand("postgres", "-c", "max_prepared_transactions=50", 
"-c", "fsync=off");
+        }
+    }
+
+    @Rule public PgXaDb db = new PgXaDb("postgres:9.6.12");
+
+    @Override
+    public void after() throws Exception {
+        // no need for cleanup - done by test container tear down
+    }
 
     @Test
     public void testInsert() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.setRestartStrategy(new NoRestartStrategyConfiguration());
+        int parallelism = 4,
+                elementsPerSource = 500,
+                numElementsPerCheckpoint = 7,
+                minElementsPerFailure = numElementsPerCheckpoint / 3,
+                maxElementsPerFailure = numElementsPerCheckpoint * 3;

Review comment:
       They are cool, but maybe write them in the normal way....?
   int xxx = ...
   int yyy = ...

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -224,11 +224,11 @@ public void open(Configuration configuration) throws 
Exception {
         hangingXids = new 
LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry());
         commitUpToCheckpoint(Optional.empty());
         if (options.isDiscoverAndRollbackOnRecovery()) {
-            // todo: consider doing recover-rollback later (e.g. after the 1st 
checkpoint)
-            // when we are sure that all other subtasks started and committed 
any of their prepared
-            // transactions
-            // this would require to distinguish between this job Xids and 
other Xids
-            xaGroupOps.recoverAndRollback();
+            // Pending transactions which are not included into the checkpoint 
might hold locks and
+            // should be rolled back. However, rolling back ALL transactions 
can cause data loss. So
+            // each subtask first commits transactions from its state and then 
rolls back discovered
+            // transactions if they belong to it.
+            xaGroupOps.recoverAndRollback(getRuntimeContext(), xidGenerator);

Review comment:
       Do not understand this part, sync up off line tomorrow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to