This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 787a981cc [flink] Improve Exception message for consumer without
expire time
787a981cc is described below
commit 787a981cc2c009075d8c2aa5f3e2c66d5988aeeb
Author: Jingsong <[email protected]>
AuthorDate: Wed Nov 13 11:32:13 2024 +0800
[flink] Improve Exception message for consumer without expire time
---
.../java/org/apache/paimon/flink/source/FlinkSourceBuilder.java | 4 +++-
.../src/test/java/org/apache/paimon/flink/CatalogTableITCase.java | 6 ++++--
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index ed94043c0..a648bfba6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -259,7 +259,9 @@ public class FlinkSourceBuilder {
if (conf.contains(CoreOptions.CONSUMER_ID)
&& !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) {
throw new IllegalArgumentException(
- "consumer.expiration-time should be specified when using
consumer-id.");
+ "You need to configure 'consumer.expiration-time' (ALTER
TABLE) and restart your write job for it"
+ + " to take effect, when you need consumer-id
feature. This is to prevent consumers from leaving"
+ + " too many snapshots that could pose a risk to
the file system.");
}
if (sourceBounded) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index ba063248e..8a3e068a7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -972,7 +972,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
"SELECT * FROM T /*+
OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1"))
.rootCause()
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("consumer.expiration-time should be specified when
using consumer-id.");
+ .hasMessageContaining(
+ "You need to configure 'consumer.expiration-time'
(ALTER TABLE) and restart your write job for it");
}
@Test
@@ -985,7 +986,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
streamSqlIter(
"SELECT * FROM T /*+
OPTIONS('consumer-id'='test-id') */"))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("consumer.expiration-time should be specified when
using consumer-id.");
+ .hasMessageContaining(
+ "You need to configure 'consumer.expiration-time'
(ALTER TABLE) and restart your write job for it");
}
@Test