This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 787a981cc [flink] Improve Exception message for consumer without 
expire time
787a981cc is described below

commit 787a981cc2c009075d8c2aa5f3e2c66d5988aeeb
Author: Jingsong <[email protected]>
AuthorDate: Wed Nov 13 11:32:13 2024 +0800

    [flink] Improve Exception message for consumer without expire time
---
 .../java/org/apache/paimon/flink/source/FlinkSourceBuilder.java     | 4 +++-
 .../src/test/java/org/apache/paimon/flink/CatalogTableITCase.java   | 6 ++++--
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index ed94043c0..a648bfba6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -259,7 +259,9 @@ public class FlinkSourceBuilder {
         if (conf.contains(CoreOptions.CONSUMER_ID)
                 && !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) {
             throw new IllegalArgumentException(
-                    "consumer.expiration-time should be specified when using 
consumer-id.");
+                    "You need to configure 'consumer.expiration-time' (ALTER 
TABLE) and restart your write job for it"
+                            + " to take effect, when you need consumer-id 
feature. This is to prevent consumers from leaving"
+                            + " too many snapshots that could pose a risk to 
the file system.");
         }
 
         if (sourceBounded) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index ba063248e..8a3e068a7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -972,7 +972,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                         "SELECT * FROM T /*+ 
OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1"))
                 .rootCause()
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("consumer.expiration-time should be specified when 
using consumer-id.");
+                .hasMessageContaining(
+                        "You need to configure 'consumer.expiration-time' 
(ALTER TABLE) and restart your write job for it");
     }
 
     @Test
@@ -985,7 +986,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                 streamSqlIter(
                                         "SELECT * FROM T /*+ 
OPTIONS('consumer-id'='test-id') */"))
                 .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("consumer.expiration-time should be specified when 
using consumer-id.");
+                .hasMessageContaining(
+                        "You need to configure 'consumer.expiration-time' 
(ALTER TABLE) and restart your write job for it");
     }
 
     @Test

Reply via email to