hudi-agent commented on code in PR #18971:
URL: https://github.com/apache/hudi/pull/18971#discussion_r3408517984
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java:
##########
@@ -61,17 +62,20 @@ public class SqlSource extends RowSource {
private static final long serialVersionUID = 1L;
private final String sourceSql;
private final SparkSession spark;
+ private final HoodieIngestionMetrics metrics;
public SqlSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
- SchemaProvider schemaProvider) {
+ SchemaProvider schemaProvider,
+ HoodieIngestionMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
checkRequiredConfigProperties(
props, Collections.singletonList(SqlSourceConfig.SOURCE_SQL));
- sourceSql = getStringWithAltKeys(props, SqlSourceConfig.SOURCE_SQL);
- spark = sparkSession;
+ this.sourceSql = getStringWithAltKeys(props, SqlSourceConfig.SOURCE_SQL);
+ this.spark = sparkSession;
+ this.metrics = metrics;
Review Comment:
🤖 The new `metrics` parameter is stored as a field but is never read
anywhere in `SqlSource` (same for `SqlFileBasedSource`). If it's reserved for a
follow-up, could the constructor change be deferred until it's actually used?
Right now this just changes a public constructor signature without functional
effect, which forces all direct callers/subclasses to update.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerMetrics.java:
##########
@@ -108,6 +108,20 @@ public void updateStreamerMetrics(long durationInNs) {
}
}
+ @Override
+ public void emitStreamerJobSuccessMetrics() {
+ if (writeConfig.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "success"), 1);
+ }
+ }
+
+ @Override
+ public void emitStreamerJobFailedMetrics() {
+ if (writeConfig.isMetricsOn()) {
+ metrics.registerGauge(getMetricsName("deltastreamer", "failure"), 1);
Review Comment:
🤖 Since `registerGauge` only ever sets the value (never resets it), once
both `emitStreamerJobSuccessMetrics` and `emitStreamerJobFailedMetrics` have
been called in the same JVM the registry will hold both
`deltastreamer.success=1` and `deltastreamer.failure=1` indefinitely. For
run-once mode this is harmless, but if these methods are ever wired into
continuous mode (or the streamer is reused for multiple runs), a monitor
reading the gauges won't be able to tell which one represents the latest
outcome. Would it make sense to also zero the opposite gauge here (e.g. emit
`success=1, failure=0` on success and vice versa) so the gauges stay mutually
exclusive?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java:
##########
@@ -924,7 +924,9 @@ private void shutdownAsyncServices(boolean error) {
public void ingestOnce() {
try {
streamSync.syncOnce();
- } catch (IOException e) {
+ streamSync.reportSuccessMetrics();
+ } catch (Exception e) {
Review Comment:
🤖 The PR description says success/failure metrics should fire "when delta
streamer job is finished", but the continuous-mode loop in `startService()`
(around lines 818–890) calls `streamSync.syncOnce()` directly rather than
`ingestOnce()`, so neither `reportSuccessMetrics()` nor
`reportFailureMetrics()` is ever invoked in continuous mode. Was that
intentional, or should the same reporting be added to the continuous loop (e.g.
on per-iteration success/failure or at shutdown via the existing `error` flag
in the finally block)?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/ingestion/HoodieIngestionMetrics.java:
##########
@@ -48,6 +48,10 @@ public HoodieIngestionMetrics(HoodieMetricsConfig
writeConfig) {
public abstract void updateStreamerMetrics(long durationNanos);
+ public abstract void emitStreamerJobSuccessMetrics();
+
+ public abstract void emitStreamerJobFailedMetrics();
Review Comment:
🤖 nit: could you rename this to `emitStreamerJobFailureMetrics()` to match
the noun form used in its pair `emitStreamerJobSuccessMetrics()`? "Success" vs
"Failed" reads asymmetrically — and the implementation already uses the string
`"failure"` for the gauge key, which aligns better with `Failure` here too.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java:
##########
@@ -924,7 +924,9 @@ private void shutdownAsyncServices(boolean error) {
public void ingestOnce() {
try {
streamSync.syncOnce();
- } catch (IOException e) {
+ streamSync.reportSuccessMetrics();
+ } catch (Exception e) {
+ streamSync.reportFailureMetrics();
throw new HoodieIngestionException(String.format("Ingestion via %s
failed with exception.", this.getClass()), e);
Review Comment:
🤖 Widening from `catch (IOException e)` to `catch (Exception e)` also
changes the externally-observed exception type for all runtime exceptions
thrown out of `syncOnce()` — they now get wrapped in `HoodieIngestionException`
rather than propagating as-is (the test updates for `HoodieMetaSyncException` /
`HoodieUpgradeDowngradeException` confirm this). Just want to flag the behavior
change in case any downstream caller or operator tooling catches specific
runtime types from `ingestOnce`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java:
##########
@@ -65,16 +66,19 @@ public class SqlFileBasedSource extends RowSource {
private final String sourceSqlFile;
private final boolean shouldEmitCheckPoint;
Review Comment:
🤖 nit: `metrics` could be `final` here — it's set once in the constructor
and never reassigned, and the equivalent field in `SqlSource` is already
declared `final`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]