This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ab8072375 [client] Improve: add detailed context to exception messages
(#1806)
ab8072375 is described below
commit ab8072375ece990105880ad3a0504a8433bd60e2
Author: CodeDrinks <[email protected]>
AuthorDate: Tue Oct 14 16:42:03 2025 +0530
[client] Improve: add detailed context to exception messages (#1806)
---
.../fluss/client/table/scanner/TableScan.java | 25 +++++++--
.../apache/fluss/client/write/WriterClient.java | 59 +++++++++++++++-------
2 files changed, 60 insertions(+), 24 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
index 1cfe3aa5f..7fade0547 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
@@ -74,7 +74,11 @@ public class TableScan implements Scan {
int index = rowType.getFieldIndex(projectedColumnNames.get(i));
if (index < 0) {
throw new IllegalArgumentException(
- "Field " + projectedColumnNames.get(i) + " not found
in table schema.");
+ String.format(
+ "Field '%s' not found in table schema.
Available fields: %s, Table: %s",
+ projectedColumnNames.get(i),
+ rowType.getFieldNames(),
+ tableInfo.getTablePath()));
}
columnIndexes[i] = index;
}
@@ -89,7 +93,10 @@ public class TableScan implements Scan {
@Override
public LogScanner createLogScanner() {
if (limit != null) {
- throw new UnsupportedOperationException("LogScanner doesn't
support limit pushdown.");
+ throw new UnsupportedOperationException(
+ String.format(
+ "LogScanner doesn't support limit pushdown. Table:
%s, requested limit: %d",
+ tableInfo.getTablePath(), limit));
}
return new LogScannerImpl(
conn.getConfiguration(),
@@ -104,7 +111,9 @@ public class TableScan implements Scan {
public BatchScanner createBatchScanner(TableBucket tableBucket) {
if (limit == null) {
throw new UnsupportedOperationException(
- "Currently, BatchScanner is only available when limit is
set.");
+ String.format(
+ "Currently, BatchScanner is only available when
limit is set. Table: %s, bucket: %s",
+ tableInfo.getTablePath(), tableBucket));
}
return new LimitBatchScanner(
tableInfo, tableBucket, conn.getMetadataUpdater(),
projectedColumns, limit);
@@ -114,7 +123,9 @@ public class TableScan implements Scan {
public BatchScanner createBatchScanner(TableBucket tableBucket, long
snapshotId) {
if (limit != null) {
throw new UnsupportedOperationException(
- "Currently, SnapshotBatchScanner doesn't support limit
pushdown.");
+ String.format(
+ "Currently, SnapshotBatchScanner doesn't support
limit pushdown. Table: %s, bucket: %s, snapshot ID: %d, requested limit: %d",
+ tableInfo.getTablePath(), tableBucket, snapshotId,
limit));
}
String scannerTmpDir =
conn.getConfiguration().getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR);
@@ -123,7 +134,11 @@ public class TableScan implements Scan {
try {
snapshotMeta = admin.getKvSnapshotMetadata(tableBucket,
snapshotId).get();
} catch (Exception e) {
- throw new FlussRuntimeException("Failed to get snapshot metadata",
e);
+ throw new FlussRuntimeException(
+ String.format(
+ "Failed to get snapshot metadata for table bucket
%s, snapshot ID: %d, Table: %s",
+ tableBucket, snapshotId, tableInfo.getTablePath()),
+ e);
}
return new KvSnapshotBatchScanner(
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 0af0b2859..e91c04843 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -92,12 +92,16 @@ public class WriterClient {
MetadataUpdater metadataUpdater,
ClientMetricGroup clientMetricGroup,
Admin admin) {
+ int maxRequestSizeLocal = -1;
+ IdempotenceManager idempotenceManagerLocal = null;
try {
this.conf = conf;
this.metadataUpdater = metadataUpdater;
- this.maxRequestSize =
+ maxRequestSizeLocal =
(int)
conf.get(ConfigOptions.CLIENT_WRITER_REQUEST_MAX_SIZE).getBytes();
- this.idempotenceManager = buildIdempotenceManager();
+ this.maxRequestSize = maxRequestSizeLocal;
+ idempotenceManagerLocal = buildIdempotenceManager();
+ this.idempotenceManager = idempotenceManagerLocal;
this.writerMetricGroup = new WriterMetricGroup(clientMetricGroup);
short acks =
configureAcks(idempotenceManager.idempotenceEnabled());
@@ -117,7 +121,14 @@ public class WriterClient {
this::maybeAbortBatches);
} catch (Throwable t) {
close(Duration.ofMillis(0));
- throw new FlussRuntimeException("Failed to construct writer", t);
+ throw new FlussRuntimeException(
+ String.format(
+ "Failed to construct writer. Max request size: %d
bytes, Idempotence enabled: %b",
+ maxRequestSizeLocal,
+ idempotenceManagerLocal != null
+ ?
idempotenceManagerLocal.idempotenceEnabled()
+ : false),
+ t);
}
}
@@ -148,7 +159,11 @@ public class WriterClient {
try {
accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
- throw new FlussRuntimeException("Flush interrupted." + e);
+ throw new FlussRuntimeException(
+ String.format(
+ "Flush interrupted after %d ms. Writer may be in
inconsistent state",
+ System.currentTimeMillis() - start),
+ e);
}
LOG.trace(
"Flushed accumulated records in writer in {} ms.",
@@ -196,7 +211,12 @@ public class WriterClient {
// TODO add the wakeup logic refer to Kafka.
}
} catch (Exception e) {
- throw new FlussRuntimeException(e);
+ throw new FlussRuntimeException(
+ String.format(
+ "Failed to send record to table %s. Writer state:
%s",
+ record.getPhysicalTablePath(),
+ sender != null && sender.isRunning() ? "running" :
"closed"),
+ e);
}
}
@@ -212,7 +232,10 @@ public class WriterClient {
private void throwIfWriterClosed() {
if (sender == null || !sender.isRunning()) {
throw new IllegalStateException(
- "Cannot perform operation after writer has been closed");
+ String.format(
+ "Cannot perform write operation after writer has
been closed. Sender running: %b, Thread pool shutdown: %b",
+ sender != null && sender.isRunning(),
+ ioThreadPool == null ||
ioThreadPool.isShutdown()));
}
}
@@ -225,11 +248,11 @@ public class WriterClient {
&& maxInflightRequestPerBucket
> MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE) {
throw new IllegalConfigurationException(
- "The value of "
- +
ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET.key()
- + " should be less than or equal to "
- + MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
- + " when idempotence writer enabled to ensure
message ordering.");
+ String.format(
+ "Invalid configuration for idempotent writer. The
value of %s (%d) should be less than or equal to %d when idempotence is enabled
to ensure message ordering",
+
ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET.key(),
+ maxInflightRequestPerBucket,
+
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE));
}
TabletServerGateway tabletServerGateway =
metadataUpdater.newRandomTabletServerClient();
@@ -249,10 +272,9 @@ public class WriterClient {
if (idempotenceEnabled && ack != -1) {
throw new IllegalConfigurationException(
- "Must set "
- + ConfigOptions.CLIENT_WRITER_ACKS.key()
- + " to 'all' in order to use the idempotent
writer. Otherwise "
- + "we cannot guarantee idempotence.");
+ String.format(
+ "Invalid acks configuration for idempotent writer.
Must set %s to 'all' (current value: '%s') in order to use the idempotent
writer. Otherwise we cannot guarantee idempotence",
+ ConfigOptions.CLIENT_WRITER_ACKS.key(), acks));
}
return ack;
@@ -262,10 +284,9 @@ public class WriterClient {
int retries = conf.getInt(ConfigOptions.CLIENT_WRITER_RETRIES);
if (idempotenceEnabled && retries == 0) {
throw new IllegalConfigurationException(
- "Must set "
- + ConfigOptions.CLIENT_WRITER_RETRIES.key()
- + " to non-zero when using the idempotent writer.
Otherwise "
- + "we cannot guarantee idempotence.");
+ String.format(
+ "Invalid retries configuration for idempotent
writer. Must set %s to non-zero (current value: %d) when using the idempotent
writer. Otherwise we cannot guarantee idempotence",
+ ConfigOptions.CLIENT_WRITER_RETRIES.key(),
retries));
}
return retries;
}