eskabetxe commented on code in PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#discussion_r1444608746


##########
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/domain/TransactionId.java:
##########
@@ -0,0 +1,271 @@
+package org.apache.flink.connector.jdbc.datasource.transactions.xa.domain;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.StringUtils.byteToHexString;
+
+/** A simple {@link Xid} implementation. */
+public class TransactionId implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int FORMAT_ID = 202;
+
+    private final byte[] jobId;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+    private final long checkpointId;
+    private final int attempts;
+    private final boolean restored;
+
+    private TransactionId(
+            byte[] jobId,
+            int subtaskId,
+            int numberOfSubtasks,
+            long checkpointId,
+            int attempts,
+            boolean restored) {
+        this.jobId = jobId;
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointId = checkpointId;
+        this.attempts = attempts;
+        this.restored = restored;
+    }
+
+    public static TransactionId empty() {
+        return create(new byte[JobID.SIZE], 0, 0);
+    }
+
+    public static TransactionId create(byte[] jobId, int subtaskId, int 
numberOfSubtasks) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, -1, 0, 
false);
+    }
+
+    public static TransactionId restore(
+            byte[] jobId, int subtaskId, int numberOfSubtasks, long 
checkpointId, int attempts) {
+        return new TransactionId(jobId, subtaskId, numberOfSubtasks, 
checkpointId, attempts, true);
+    }
+
+    public static TransactionId createFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, false);
+    }
+
+    public static TransactionId restoreFromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier) 
throws IOException {
+        return fromXid(formatId, globalTransactionId, branchQualifier, true);
+    }
+
+    public static TransactionId fromXid(
+            int formatId, byte[] globalTransactionId, byte[] branchQualifier, 
boolean restored)
+            throws IOException {
+        if (FORMAT_ID != formatId) {
+            throw new IOException(String.format("Xid formatId (%s) is not 
valid", formatId));
+        }
+
+        final DataInputDeserializer gid = new 
DataInputDeserializer(globalTransactionId);
+        byte[] jobIdBytes = readJobId(gid);
+        int subtaskId = gid.readInt();
+
+        final DataInputDeserializer branch = new 
DataInputDeserializer(branchQualifier);
+        int numberOfSubtasks = branch.readInt();
+        long checkpoint = branch.readLong();
+
+        return new TransactionId(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, 0, restored);
+    }
+
+    public static TransactionId deserialize(byte[] bytes) {
+        try {
+            final DataInputDeserializer in = new DataInputDeserializer(bytes);
+            byte[] jobIdBytes = readJobId(in);
+            int subtaskId = in.readInt();
+            int numberOfSubtasks = in.readInt();
+            long checkpoint = in.readLong();
+            int attempts = in.readInt();
+            return restore(jobIdBytes, subtaskId, numberOfSubtasks, 
checkpoint, attempts);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(e.getLocalizedMessage());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to