phet commented on code in PR #4032:
URL: https://github.com/apache/gobblin/pull/4032#discussion_r1776056489
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -77,22 +77,11 @@ public interface DagManagementStateStore {
/**
* This marks the dag as a failed one.
- * Failed dags are queried using {@link
DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
+ * Failed dags are queried using {@link
DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried.
Review Comment:
does it remain useful to both retrieve the DAG while also asserting that
it's failed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -167,7 +167,7 @@ public void cleanUp(String dagId) throws IOException {
@Override
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
throw new NotSupportedException(getClass().getSimpleName() + " does not
need this legacy API that originated with "
- + "the DagManager that is replaced by DagProcessingEngine"); }
+ + "the DagManager that is replaced by DagProcessingEngine");}
Review Comment:
actually, doesn't this need a newline before `}`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -147,21 +140,10 @@ public void deleteDag(DagManager.DagId dagId) throws
IOException {
log.info("Deleted dag {}", dagId);
}
- @Override
- public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
- this.failedDagStateStore.cleanUp(dagId);
- log.info("Deleted failed dag {}", dagId);
- }
-
- @Override
- public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId)
throws IOException {
- return Optional.of(this.failedDagStateStore.getDag(dagId));
- }
-
@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan>
dagNode, DagManager.DagId dagId)
throws IOException {
- this.dagStateStore.updateDagNode(dagId, dagNode);
+ this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is
set as false because addDagNodeState adds a new DagNode, doesn't update an
existing dagNode as failed.
Review Comment:
nit: space before starting a comment. also more brevity; e.g.:
```
// create all DagNodes as isFailedDag == false
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config,
FlowCatalog flowCatalog, User
private synchronized void start() {
if (!dagStoresInitialized) {
this.dagStateStore = createDagStateStore(config, topologySpecMap);
- this.failedDagStateStore =
createDagStateStore(ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
Review Comment:
any ideas on handling migration when we roll this out (presuming the failed
DagStateStore was not empty)?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java:
##########
@@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config,
Map<URI, TopologySpec> topo
DataSource dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
try (Connection connection = dataSource.getConnection();
- PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ PreparedStatement createStatement = connection.prepareStatement(
+ String.format(CREATE_TABLE_STATEMENT, tableName))) {
Review Comment:
given arjun just wrote this class a month or two back, please ensure your
auto-formatting is what it's supposed to be. it is possible his was off, but
let's check. sure we might fix spelling errors, but there should be little
reason to reformat files we've only just created
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java:
##########
@@ -137,4 +152,56 @@ public void testAddGetAndDeleteDag() throws Exception{
Assert.assertNull(this.dagStateStore.getDag(dagId1));
Assert.assertNull(this.dagStateStore.getDag(dagId2));
}
+
+ @Test
+ public void testMarkDagAsFailed() throws Exception {
+ // Set up initial conditions
+ Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L);
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+
+ this.dagStateStore.writeCheckpoint(dag);
+
+ // Fetch all initial states into a list
+ List<Boolean> initialStates = fetchDagNodeStates(dagId.toString());
+
+ // Check Initial State
+ for (Boolean state : initialStates) {
+ Assert.assertFalse(state);
+ }
+ // Set the DAG as failed
+ dag.setFailedDag(true);
+ this.dagStateStore.writeCheckpoint(dag);
+
+ // Fetch all states after marking the DAG as failed
+ List<Boolean> failedStates = fetchDagNodeStates(dagId.toString());
+
+ // Check if all states are now true (indicating failure)
+ for (Boolean state : failedStates) {
+ Assert.assertTrue(state);
+ }
+ dagStateStore.cleanUp(dagId);
+ Assert.assertNull(this.dagStateStore.getDag(dagId));
+ }
+
+ private List<Boolean> fetchDagNodeStates(String dagId) throws IOException {
+ List<Boolean> states = new ArrayList<>();
+
+
dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT,
tableName), getStatement -> {
Review Comment:
is behind-the-scenes DB access the only way to validate behavior here? is
there no way to access from the "official" DagStateStore, then mark failed and
finally re-access from the DSS to verify all nodes have changed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java:
##########
@@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config
config) {
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- return dagManagementStateStore.getFailedDag(getDagId());
+ return dagManagementStateStore.getDag(getDagId());
Review Comment:
verify it's actually failed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java:
##########
@@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends
DagStateStore {
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and
0 if new dag node is same as the existing one
* <a
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
*/
- int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan>
dagNode) throws IOException;
+ int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan>
dagNode, boolean isFailedDag) throws IOException;
Review Comment:
wondering on semantics then. here we have a method to update a `DagNode`,
but it takes a flag about whether its parent, the DAG is failed. does `true`
here mean, in the course of updating the `DagNode`, to mark the entire DAG
failed? (I can imagine that faillure in one of the nodes would render the
entire DAG failed.)
that said, what if there are multiple concurrent jobs, each corresponding to
a `DagNode`, and they all complete about the same time, with several
succeeding, but one failing? is there a race-condition where unless the failed
`DagNode` is processed last, that a successful `DagNode` might get processed
just after the failure and re-set the DAG back to NOT failed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]