[
https://issues.apache.org/jira/browse/GOBBLIN-2143?focusedWorklogId=931851&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931851
]
ASF GitHub Bot logged work on GOBBLIN-2143:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Aug/24 06:34
Start Date: 27/Aug/24 06:34
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4038:
URL: https://github.com/apache/gobblin/pull/4038#discussion_r1732108773
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java:
##########
@@ -35,12 +35,10 @@
public interface DagStateStoreWithDagNodes extends DagStateStore {
/**
- * Updates a dag node identified by the provided {@link DagManager.DagId}
- * with the given {@link Dag.DagNode}.
- * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and
0 if new dag node is same as the existing one
- * <a
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
+ * Updates the {@link Dag.DagNode} with the provided value.
+ * Returns 1 if the dag node is updated successfully, 0 otherwise
Review Comment:
why a 0/1 int rather than a boolean? are you expecting to increase to a
larger number eventually?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config,
Map<URI, TopologySpec> topo
}
@Override
- public void writeCheckpoint(Dag<JobExecutionPlan> dag)
- throws IOException {
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- boolean newDag = false;
- for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- if (updateDagNode(dagId, dagNode) == 1) {
- newDag = true;
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
tableName), insertStatement -> {
+ int dagSize = dag.getNodes().size();
+ Object[][] data = new Object[dagSize][3];
+
+ for (int i=0; i<dagSize; i++) {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+ data[i][0] = dagNode.getValue().getId().toString();
+ data[i][1] = dagId;
+ data[i][2]
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
}
- }
- if (newDag) {
- this.totalDagCount.inc();
- }
+
+ for (Object[] row : data) {
+ insertStatement.setObject(1, row[0]);
+ insertStatement.setObject(2, row[1]);
+ insertStatement.setObject(3, row[2]);
+ insertStatement.addBatch();
+ }
+ try {
+ return insertStatement.executeBatch();
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure adding dag for %s",
dagId), e);
+ }}, true);
+
+ this.totalDagCount.inc();
Review Comment:
is this an in-memory counter? what's the purpose?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config,
Map<URI, TopologySpec> topo
}
@Override
- public void writeCheckpoint(Dag<JobExecutionPlan> dag)
- throws IOException {
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- boolean newDag = false;
- for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- if (updateDagNode(dagId, dagNode) == 1) {
- newDag = true;
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
tableName), insertStatement -> {
+ int dagSize = dag.getNodes().size();
+ Object[][] data = new Object[dagSize][3];
+
+ for (int i=0; i<dagSize; i++) {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+ data[i][0] = dagNode.getValue().getId().toString();
+ data[i][1] = dagId;
+ data[i][2]
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
Review Comment:
a few of these need space on either side of the operator
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -126,18 +126,32 @@ public MysqlDagStateStoreWithDagNodes(Config config,
Map<URI, TopologySpec> topo
}
@Override
- public void writeCheckpoint(Dag<JobExecutionPlan> dag)
- throws IOException {
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- boolean newDag = false;
- for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- if (updateDagNode(dagId, dagNode) == 1) {
- newDag = true;
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
tableName), insertStatement -> {
+ int dagSize = dag.getNodes().size();
+ Object[][] data = new Object[dagSize][3];
+
+ for (int i=0; i<dagSize; i++) {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(i);
+ data[i][0] = dagNode.getValue().getId().toString();
+ data[i][1] = dagId;
+ data[i][2]
=this.serDe.serialize(Collections.singletonList(dagNode.getValue()));
}
- }
- if (newDag) {
- this.totalDagCount.inc();
- }
+
+ for (Object[] row : data) {
Review Comment:
why iterate through to populate data when the only thing done w/ it is to
iterate over it here? instead, just iterate through the `DagNode`s and
populate `insertStatement` directly.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -142,8 +142,11 @@ public void markDagFailed(DagManager.DagId dagId) throws
IOException {
@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
- this.dagStateStore.cleanUp(dagId);
- log.info("Deleted dag {}", dagId);
+ if (this.dagStateStore.cleanUp(dagId)) {
+ log.info("Deleted dag {}", dagId);
+ } else {
+ log.info("Dag deletion was tried but did not happen {}", dagId);
Review Comment:
only `.info` level, not `warn` or even `error`? alternatively this could
arguably be an exception
Issue Time Tracking
-------------------
Worklog Id: (was: 931851)
Remaining Estimate: 0h
Time Spent: 10m
> handle concurrent ReevaluateDagProc for cancelled dag nodes correctly
> ---------------------------------------------------------------------
>
> Key: GOBBLIN-2143
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2143
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)