phet commented on code in PR #4038:
URL: https://github.com/apache/gobblin/pull/4038#discussion_r1732108773
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java:
##########
@@ -35,12 +35,10 @@
public interface DagStateStoreWithDagNodes extends DagStateStore {
/**
- * Updates a dag node identified by the provided {@link DagManager.DagId}
- * with the given {@link Dag.DagNode}.
- * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and
0 if new dag node is same as the existing one
- * <a
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
+ * Updates the {@link Dag.DagNode} with the provided value.
+ * Returns 1 if the dag node is updated successfully, 0 otherwise
Review Comment:
why a 0/1 int rather than a boolean? are you expecting to increase to a
larger number eventually?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config,
Map<URI, TopologySpec> topo
}
@Override
- public void writeCheckpoint(Dag<JobExecutionPlan> dag)
- throws IOException {
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- boolean newDag = false;
- for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- if (updateDagNode(dagId, dagNode) == 1) {
- newDag = true;
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
tableName), insertStatement -> {
+ int dagSize = dag.getNodes().size();
+ Object[][] data = new Object[dagSize][3];
+
+ for (int i=0; i<dagSize; i++) {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+ data[i][0] = dagNode.getValue().getId().toString();
+ data[i][1] = dagId;
+ data[i][2]
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
}
- }
- if (newDag) {
- this.totalDagCount.inc();
- }
+
+ for (Object[] row : data) {
+ insertStatement.setObject(1, row[0]);
+ insertStatement.setObject(2, row[1]);
+ insertStatement.setObject(3, row[2]);
+ insertStatement.addBatch();
+ }
+ try {
+ return insertStatement.executeBatch();
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure adding dag for %s",
dagId), e);
+ }}, true);
+
+ this.totalDagCount.inc();
Review Comment:
is this an in-memory counter? what's the purpose?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config,
Map<URI, TopologySpec> topo
}
@Override
- public void writeCheckpoint(Dag<JobExecutionPlan> dag)
- throws IOException {
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- boolean newDag = false;
- for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- if (updateDagNode(dagId, dagNode) == 1) {
- newDag = true;
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
tableName), insertStatement -> {
+ int dagSize = dag.getNodes().size();
+ Object[][] data = new Object[dagSize][3];
+
+ for (int i=0; i<dagSize; i++) {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+ data[i][0] = dagNode.getValue().getId().toString();
+ data[i][1] = dagId;
+ data[i][2]
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
Review Comment:
a few of these need space on either side of the operator
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config,
Map<URI, TopologySpec> topo
}
@Override
- public void writeCheckpoint(Dag<JobExecutionPlan> dag)
- throws IOException {
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- boolean newDag = false;
- for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- if (updateDagNode(dagId, dagNode) == 1) {
- newDag = true;
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
tableName), insertStatement -> {
+ int dagSize = dag.getNodes().size();
+ Object[][] data = new Object[dagSize][3];
+
+ for (int i=0; i<dagSize; i++) {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+ data[i][0] = dagNode.getValue().getId().toString();
+ data[i][1] = dagId;
+ data[i][2]
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
}
- }
- if (newDag) {
- this.totalDagCount.inc();
- }
+
+ for (Object[] row : data) {
Review Comment:
why iterate through to populate data when the only thing done w/ it is to
iterate over it here? instead, just iterate through the `DagNode`s and
populate `insertStatement` directly.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -142,8 +142,11 @@ public void markDagFailed(DagManager.DagId dagId) throws
IOException {
@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
- this.dagStateStore.cleanUp(dagId);
- log.info("Deleted dag {}", dagId);
+ if (this.dagStateStore.cleanUp(dagId)) {
+ log.info("Deleted dag {}", dagId);
+ } else {
+ log.info("Dag deletion was tried but did not happen {}", dagId);
Review Comment:
only `.info` level, not `warn` or even `error`? alternatively this could
arguably be an exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]