hudi-agent commented on code in PR #18971:
URL: https://github.com/apache/hudi/pull/18971#discussion_r3408517984


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java:
##########
@@ -61,17 +62,20 @@ public class SqlSource extends RowSource {
   private static final long serialVersionUID = 1L;
   private final String sourceSql;
   private final SparkSession spark;
+  private final HoodieIngestionMetrics metrics;
 
   public SqlSource(
       TypedProperties props,
       JavaSparkContext sparkContext,
       SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
+      SchemaProvider schemaProvider,
+      HoodieIngestionMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider);
     checkRequiredConfigProperties(
         props, Collections.singletonList(SqlSourceConfig.SOURCE_SQL));
-    sourceSql = getStringWithAltKeys(props, SqlSourceConfig.SOURCE_SQL);
-    spark = sparkSession;
+    this.sourceSql = getStringWithAltKeys(props, SqlSourceConfig.SOURCE_SQL);
+    this.spark = sparkSession;
+    this.metrics = metrics;

Review Comment:
   🤖 The new `metrics` parameter is stored as a field but is never read 
anywhere in `SqlSource` (same for `SqlFileBasedSource`). If it's reserved for a 
follow-up, could the constructor change be deferred until it's actually used? 
Right now this just changes a public constructor signature without functional 
effect, which forces all direct callers/subclasses to update.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java:
##########
@@ -108,6 +108,20 @@ public void updateStreamerMetrics(long durationInNs) {
     }
   }
 
+  @Override
+  public void emitStreamerJobSuccessMetrics() {
+    if (writeConfig.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "success"), 1);
+    }
+  }
+
+  @Override
+  public void emitStreamerJobFailedMetrics() {
+    if (writeConfig.isMetricsOn()) {
+      metrics.registerGauge(getMetricsName("deltastreamer", "failure"), 1);

Review Comment:
   🤖 Since `registerGauge` only ever sets the value (never resets it), once 
both `emitStreamerJobSuccessMetrics` and `emitStreamerJobFailedMetrics` have 
been called in the same JVM the registry will hold both 
`deltastreamer.success=1` and `deltastreamer.failure=1` indefinitely. For 
run-once mode this is harmless, but if these methods are ever wired into 
continuous mode (or the streamer is reused for multiple runs), a monitor 
reading the gauges won't be able to tell which one represents the latest 
outcome. Would it make sense to also zero the opposite gauge here (e.g. emit 
`success=1, failure=0` on success and vice versa) so the gauges stay mutually 
exclusive?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java:
##########
@@ -924,7 +924,9 @@ private void shutdownAsyncServices(boolean error) {
     public void ingestOnce() {
       try {
         streamSync.syncOnce();
-      } catch (IOException e) {
+        streamSync.reportSuccessMetrics();
+      } catch (Exception e) {

Review Comment:
   🤖 The PR description says success/failure metrics should fire "when delta 
streamer job is finished", but the continuous-mode loop in `startService()` 
(around lines 818–890) calls `streamSync.syncOnce()` directly rather than 
`ingestOnce()`, so neither `reportSuccessMetrics()` nor 
`reportFailureMetrics()` is ever invoked in continuous mode. Was that 
intentional, or should the same reporting be added to the continuous loop (e.g. 
on per-iteration success/failure or at shutdown via the existing `error` flag 
in the finally block)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java:
##########
@@ -48,6 +48,10 @@ public HoodieIngestionMetrics(HoodieMetricsConfig 
writeConfig) {
 
   public abstract void updateStreamerMetrics(long durationNanos);
 
+  public abstract void emitStreamerJobSuccessMetrics();
+
+  public abstract void emitStreamerJobFailedMetrics();

Review Comment:
   🤖 nit: could you rename this to `emitStreamerJobFailureMetrics()` to match 
the noun form used in its pair `emitStreamerJobSuccessMetrics()`? "Success" vs 
"Failed" reads asymmetrically — and the implementation already uses the string 
`"failure"` for the gauge key, which aligns better with `Failure` here too.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java:
##########
@@ -924,7 +924,9 @@ private void shutdownAsyncServices(boolean error) {
     public void ingestOnce() {
       try {
         streamSync.syncOnce();
-      } catch (IOException e) {
+        streamSync.reportSuccessMetrics();
+      } catch (Exception e) {
+        streamSync.reportFailureMetrics();
         throw new HoodieIngestionException(String.format("Ingestion via %s 
failed with exception.", this.getClass()), e);

Review Comment:
   🤖 Widening from `catch (IOException e)` to `catch (Exception e)` also 
changes the externally-observed exception type for all runtime exceptions 
thrown out of `syncOnce()` — they now get wrapped in `HoodieIngestionException` 
rather than propagating as-is (the test updates for `HoodieMetaSyncException` / 
`HoodieUpgradeDowngradeException` confirm this). Just want to flag the behavior 
change in case any downstream caller or operator tooling catches specific 
runtime types from `ingestOnce`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java:
##########
@@ -65,16 +66,19 @@ public class SqlFileBasedSource extends RowSource {
 
   private final String sourceSqlFile;
   private final boolean shouldEmitCheckPoint;

Review Comment:
   🤖 nit: `metrics` could be `final` here — it's set once in the constructor 
and never reassigned, and the equivalent field in `SqlSource` is already 
declared `final`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to