This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f824fea3 [feature][api] add option validation for the ReadonlyConfig 
(#3417)
4f824fea3 is described below

commit 4f824fea36a1460f9031f0ea3672c848d5972a85
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Nov 15 10:38:54 2022 +0800

    [feature][api] add option validation for the ReadonlyConfig (#3417)
    
    * [feature][api] add option validation for the ReadonlyConfig
    
    * [chore] AbsolutelyRequiredOption rename to AbsolutelyRequiredOptions
---
 .../api/configuration/ReadonlyConfig.java          |   2 +-
 .../api/configuration/util/ConfigValidator.java    | 178 ++++++++++++++
 .../api/configuration/util/OptionRule.java         |  14 +-
 .../api/configuration/util/OptionUtil.java         |  43 ++++
 .../util/OptionValidationException.java            |   4 +
 .../api/configuration/util/RequiredOption.java     |  68 +++---
 .../seatunnel/api/configuration/OptionTest.java    |   1 -
 .../api/configuration/ReadableConfigTest.java      |   4 +-
 .../api/configuration/util/ConditionTest.java      |   1 -
 .../configuration/util/ConfigValidatorTest.java    | 267 +++++++++++++++++++++
 .../api/configuration/util/ExpressionTest.java     |   1 -
 .../sink/client/ClickhouseSinkFactory.java         |   2 +-
 .../seatunnel/redis/source/RedisSourceFactory.java |   2 +-
 13 files changed, 543 insertions(+), 44 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index 32e8db273..e82fbdeac 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -97,7 +97,7 @@ public class ReadonlyConfig {
         for (int i = 0; i < keys.length; i++) {
             value = data.get(keys[i]);
             if (i < keys.length - 1) {
-                if (!((value instanceof Map))) {
+                if (!(value instanceof Map)) {
                     return Optional.empty();
                 } else {
                     data = (Map<String, Object>) value;
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
new file mode 100644
index 000000000..03e785f95
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static 
org.apache.seatunnel.api.configuration.util.OptionUtil.getOptionKeys;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+public class ConfigValidator {
+    private final ReadonlyConfig config;
+
+    private ConfigValidator(ReadonlyConfig config) {
+        this.config = config;
+    }
+
+    public static ConfigValidator of(ReadonlyConfig config) {
+        return new ConfigValidator(config);
+    }
+
+    public void validate(OptionRule rule) {
+        Set<RequiredOption> requiredOptions = rule.getRequiredOptions();
+        for (RequiredOption requiredOption : requiredOptions) {
+            validate(requiredOption);
+        }
+    }
+
+    void validate(RequiredOption requiredOption) {
+        if (requiredOption instanceof 
RequiredOption.AbsolutelyRequiredOptions) {
+            validate((RequiredOption.AbsolutelyRequiredOptions) 
requiredOption);
+            return;
+        }
+        if (requiredOption instanceof RequiredOption.BundledRequiredOptions) {
+            validate((RequiredOption.BundledRequiredOptions) requiredOption);
+            return;
+        }
+        if (requiredOption instanceof RequiredOption.ExclusiveRequiredOptions) 
{
+            validate((RequiredOption.ExclusiveRequiredOptions) requiredOption);
+            return;
+        }
+        if (requiredOption instanceof 
RequiredOption.ConditionalRequiredOptions) {
+            validate((RequiredOption.ConditionalRequiredOptions) 
requiredOption);
+            return;
+        }
+        throw new UnsupportedOperationException(String.format("This type 
option(%s) of validation is not supported", requiredOption.getClass()));
+    }
+
+    private Set<Option<?>> getAbsentOptions(Set<Option<?>> requiredOption) {
+        Set<Option<?>> absent = new HashSet<>();
+        for (Option<?> option : requiredOption) {
+            if (!hasOption(option)) {
+                absent.add(option);
+            }
+        }
+        return absent;
+    }
+
+    void validate(RequiredOption.AbsolutelyRequiredOptions requiredOption) {
+        Set<Option<?>> absentOptions = 
getAbsentOptions(requiredOption.getRequiredOption());
+        if (absentOptions.size() == 0) {
+            return;
+        }
+        throw new OptionValidationException("There are unconfigured options, 
the options(%s) are required.", getOptionKeys(absentOptions));
+    }
+
+    boolean hasOption(Option<?> option) {
+        return config.getOptional(option).isPresent();
+    }
+
+    boolean validate(RequiredOption.BundledRequiredOptions 
bundledRequiredOptions) {
+        Set<Option<?>> bundledOptions = 
bundledRequiredOptions.getRequiredOption();
+        Set<Option<?>> present = new HashSet<>();
+        Set<Option<?>> absent = new HashSet<>();
+        for (Option<?> option : bundledOptions) {
+            if (hasOption(option)) {
+                present.add(option);
+            } else {
+                absent.add(option);
+            }
+        }
+        if (present.size() == bundledOptions.size()) {
+            return true;
+        }
+        if (absent.size() == bundledOptions.size()) {
+            return false;
+        }
+        throw new OptionValidationException("These options(%s) are bundled, 
must be present or absent together. The options present are: %s. The options 
absent are %s.",
+            getOptionKeys(bundledOptions), getOptionKeys(present), 
getOptionKeys(absent));
+    }
+
+    void validate(RequiredOption.ExclusiveRequiredOptions 
exclusiveRequiredOptions) {
+        Set<RequiredOption.BundledRequiredOptions> 
presentBundledRequiredOptions = new HashSet<>();
+        Set<Option<?>> presentOptions = new HashSet<>();
+        for (RequiredOption.BundledRequiredOptions bundledOptions : 
exclusiveRequiredOptions.getExclusiveBundledOptions()) {
+            if (validate(bundledOptions)) {
+                presentBundledRequiredOptions.add(bundledOptions);
+            }
+        }
+
+        for (Option<?> option : 
exclusiveRequiredOptions.getExclusiveOptions()) {
+            if (hasOption(option)) {
+                presentOptions.add(option);
+            }
+        }
+        int count = presentBundledRequiredOptions.size() + 
presentOptions.size();
+        if (count == 1) {
+            return;
+        }
+        if (count == 0) {
+            throw new OptionValidationException("There are unconfigured 
options, these options(%s) are mutually exclusive, allowing only one set(\"[] 
for a set\") of options to be configured.",
+                getOptionKeys(exclusiveRequiredOptions.getExclusiveOptions(), 
exclusiveRequiredOptions.getExclusiveBundledOptions()));
+        }
+        if (count > 1) {
+            throw new OptionValidationException("These options(%s) are 
mutually exclusive, allowing only one set(\"[] for a set\") of options to be 
configured.",
+                getOptionKeys(presentOptions, presentBundledRequiredOptions));
+        }
+    }
+
+    void validate(RequiredOption.ConditionalRequiredOptions 
conditionalRequiredOptions) {
+        Expression expression = conditionalRequiredOptions.getExpression();
+        boolean match = validate(expression);
+        if (!match) {
+            return;
+        }
+        Set<Option<?>> absentOptions = 
getAbsentOptions(conditionalRequiredOptions.getRequiredOption());
+        if (absentOptions.size() == 0) {
+            return;
+        }
+        throw new OptionValidationException("There are unconfigured options, 
the options(%s) are required because [%s] is true.",
+            getOptionKeys(absentOptions), expression.toString());
+    }
+
+    private boolean validate(Expression expression) {
+        Condition<?> condition = expression.getCondition();
+        boolean match = validate(condition);
+        if (!expression.hasNext()) {
+            return match;
+        }
+        if (expression.and()) {
+            return match && validate(expression.getNext());
+        } else {
+            return match || validate(expression.getNext());
+        }
+    }
+
+    private <T> boolean validate(Condition<T> condition) {
+        Option<T> option = condition.getOption();
+
+        boolean match = Objects.equals(condition.getExpectValue(), 
config.get(option));
+        if (!condition.hasNext()) {
+            return match;
+        }
+        if (condition.and()) {
+            return match && validate(condition.getNext());
+        } else {
+            return match || validate(condition.getNext());
+        }
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
index 861859b23..f4007f53e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionRule.java
@@ -138,8 +138,8 @@ public class OptionRule {
         public Builder required(Option<?>... options) {
             for (Option<?> option : options) {
                 verifyRequiredOptionDefaultValue(option);
-                
this.requiredOptions.add(RequiredOption.AbsolutelyRequiredOption.of(option));
             }
+            
this.requiredOptions.add(RequiredOption.AbsolutelyRequiredOptions.of(options));
             return this;
         }
 
@@ -157,6 +157,11 @@ public class OptionRule {
             return this;
         }
 
+        public Builder exclusive(RequiredOption.ExclusiveRequiredOptions 
exclusiveRequiredOptions) {
+            this.requiredOptions.add(exclusiveRequiredOptions);
+            return this;
+        }
+
         /**
          * Conditional options, These options are required if the {@link 
Option} == expectValue.
          */
@@ -182,8 +187,11 @@ public class OptionRule {
             return this;
         }
 
-        public Builder bundledRequired(Option<?>... requiredOptions) {
-            
this.requiredOptions.add(RequiredOption.BundledRequiredOptions.of(new 
HashSet<>(Arrays.asList(requiredOptions))));
+        /**
+         * Bundled options, must be present or absent together.
+         */
+        public Builder bundled(Option<?>... requiredOptions) {
+            
this.requiredOptions.add(RequiredOption.BundledRequiredOptions.of(requiredOptions));
             return this;
         }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
index 7c6835efc..8027087c3 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionUtil.java
@@ -25,10 +25,53 @@ import org.apache.commons.lang3.StringUtils;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class OptionUtil {
 
+    private OptionUtil() {
+    }
+
+    public static String getOptionKeys(Set<Option<?>> options) {
+        StringBuilder builder = new StringBuilder();
+        boolean flag = false;
+        for (Option<?> option : options) {
+            if (flag) {
+                builder.append(", ");
+            }
+            builder.append("'")
+                .append(option.key())
+                .append("'");
+            flag = true;
+        }
+        return builder.toString();
+    }
+
+    public static String getOptionKeys(Set<Option<?>> options, 
Set<RequiredOption.BundledRequiredOptions> bundledOptions) {
+        Set<Set<Option<?>>> optionSets = new HashSet<>();
+        for (Option<?> option : options) {
+            optionSets.add(Collections.singleton(option));
+        }
+        for (RequiredOption.BundledRequiredOptions bundledOption : 
bundledOptions) {
+            optionSets.add(bundledOption.getRequiredOption());
+        }
+        boolean flag = false;
+        StringBuilder builder = new StringBuilder();
+        for (Set<Option<?>> optionSet : optionSets) {
+            if (flag) {
+                builder.append(", ");
+            }
+            builder.append("[")
+                .append(getOptionKeys(optionSet))
+                .append("]");
+            flag = true;
+        }
+        return builder.toString();
+    }
+
     public static List<Option<?>> getOptions(Class<?> clazz) throws 
InstantiationException, IllegalAccessException {
         Field[] fields = clazz.getDeclaredFields();
         List<Option<?>> options = new ArrayList<>();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
index 2abe052d7..306a208fe 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/OptionValidationException.java
@@ -32,4 +32,8 @@ public class OptionValidationException extends 
SeaTunnelRuntimeException {
     public OptionValidationException(String message) {
         super(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, message);
     }
+
+    public OptionValidationException(String formatMessage, Object... args) {
+        super(String.format(formatMessage, args));
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
index acafaaa99..be28d60b4 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/RequiredOption.java
@@ -17,27 +17,38 @@
 
 package org.apache.seatunnel.api.configuration.util;
 
+import static 
org.apache.seatunnel.api.configuration.util.OptionUtil.getOptionKeys;
+
 import org.apache.seatunnel.api.configuration.Option;
 
+import lombok.Getter;
+
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
 
 public interface RequiredOption {
+
+    /**
+     * These options are mutually exclusive, allowing only one set of options 
to be configured.
+     */
+    @Getter
     class ExclusiveRequiredOptions implements RequiredOption {
+        private final Set<BundledRequiredOptions> exclusiveBundledOptions;
         private final Set<Option<?>> exclusiveOptions;
 
-        ExclusiveRequiredOptions(Set<Option<?>> exclusiveOptions) {
+        ExclusiveRequiredOptions(Set<BundledRequiredOptions> 
exclusiveBundledOptions, Set<Option<?>> exclusiveOptions) {
+            this.exclusiveBundledOptions = exclusiveBundledOptions;
             this.exclusiveOptions = exclusiveOptions;
         }
 
         public static ExclusiveRequiredOptions of(Option<?>... 
exclusiveOptions) {
-            return new ExclusiveRequiredOptions(new 
HashSet<>(Arrays.asList(exclusiveOptions)));
+            return ExclusiveRequiredOptions.of(new HashSet<>(), 
exclusiveOptions);
         }
 
-        public Set<Option<?>> getExclusiveOptions() {
-            return exclusiveOptions;
+        public static ExclusiveRequiredOptions of(Set<BundledRequiredOptions> 
exclusiveBundledOptions, Option<?>... exclusiveOptions) {
+            return new ExclusiveRequiredOptions(exclusiveBundledOptions, new 
HashSet<>(Arrays.asList(exclusiveOptions)));
         }
 
         @Override
@@ -54,39 +65,28 @@ public interface RequiredOption {
 
         @Override
         public int hashCode() {
-            return Objects.hash(exclusiveOptions);
-        }
-
-        static String getOptionKeys(Set<Option<?>> options) {
-            StringBuilder builder = new StringBuilder();
-            int i = 0;
-            for (Option<?> option : options) {
-                if (i > 0) {
-                    builder.append(", ");
-                }
-                builder.append("'")
-                    .append(option.key())
-                    .append("'");
-                i++;
-            }
-            return builder.toString();
+            return Objects.hash(exclusiveBundledOptions, exclusiveOptions);
         }
 
         @Override
         public String toString() {
-            return String.format("Exclusive required options: %s", 
getOptionKeys(exclusiveOptions));
+            return String.format("Exclusive required set options: %s", 
getOptionKeys(exclusiveOptions, exclusiveBundledOptions));
         }
     }
 
-    class AbsolutelyRequiredOption<T> implements RequiredOption {
-        private final Option<T> requiredOption;
+    /**
+     * The option is required.
+     */
+    class AbsolutelyRequiredOptions implements RequiredOption {
+        @Getter
+        private final Set<Option<?>> requiredOption;
 
-        AbsolutelyRequiredOption(Option<T> requiredOption) {
+        AbsolutelyRequiredOptions(Set<Option<?>> requiredOption) {
             this.requiredOption = requiredOption;
         }
 
-        public static <T> AbsolutelyRequiredOption<T> of(Option<T> 
requiredOption) {
-            return new AbsolutelyRequiredOption<>(requiredOption);
+        public static AbsolutelyRequiredOptions of(Option<?>... 
requiredOption) {
+            return new AbsolutelyRequiredOptions(new 
HashSet<>(Arrays.asList(requiredOption)));
         }
 
         @Override
@@ -94,10 +94,10 @@ public interface RequiredOption {
             if (this == obj) {
                 return true;
             }
-            if (!(obj instanceof AbsolutelyRequiredOption)) {
+            if (!(obj instanceof AbsolutelyRequiredOptions)) {
                 return false;
             }
-            AbsolutelyRequiredOption<?> that = (AbsolutelyRequiredOption<?>) 
obj;
+            AbsolutelyRequiredOptions that = (AbsolutelyRequiredOptions) obj;
             return Objects.equals(this.requiredOption, that.requiredOption);
         }
 
@@ -108,7 +108,7 @@ public interface RequiredOption {
 
         @Override
         public String toString() {
-            return String.format("Absolutely required option: '%s'", 
requiredOption.key());
+            return String.format("Absolutely required options: '%s'", 
getOptionKeys(requiredOption));
         }
     }
 
@@ -156,12 +156,12 @@ public interface RequiredOption {
 
         @Override
         public String toString() {
-            return String.format("Condition expression: %s, Required options: 
%s", expression, ExclusiveRequiredOptions.getOptionKeys(requiredOption));
+            return String.format("Condition expression: %s, Required options: 
%s", expression, getOptionKeys(requiredOption));
         }
     }
 
     /**
-     * All options must exist or not exist at the same time
+     * These options are bundled, must be present or absent together.
      */
     class BundledRequiredOptions implements RequiredOption {
         private final Set<Option<?>> requiredOption;
@@ -170,6 +170,10 @@ public interface RequiredOption {
             this.requiredOption = requiredOption;
         }
 
+        public static BundledRequiredOptions of(Option<?>... requiredOption) {
+            return new BundledRequiredOptions(new 
HashSet<>(Arrays.asList(requiredOption)));
+        }
+
         public static BundledRequiredOptions of(Set<Option<?>> requiredOption) 
{
             return new BundledRequiredOptions(requiredOption);
         }
@@ -197,7 +201,7 @@ public interface RequiredOption {
 
         @Override
         public String toString() {
-            return String.format("Both Required options: %s", 
ExclusiveRequiredOptions.getOptionKeys(requiredOption));
+            return String.format("Bundled Required options: %s", 
getOptionKeys(requiredOption));
         }
     }
 }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
index c3dc57c42..0f9371d8a 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/OptionTest.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.api.configuration;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-@SuppressWarnings("MagicNumber")
 public class OptionTest {
     public static final Option<Integer> TEST_NUM = Options.key("option.num")
         .intType()
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
index 62c45a01b..54b672daa 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/ReadableConfigTest.java
@@ -34,9 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@SuppressWarnings({
-    "MagicNumber", "checkstyle:StaticVariableName"
-})
+@SuppressWarnings("checkstyle:StaticVariableName")
 public class ReadableConfigTest {
     private static final String CONFIG_PATH = "/conf/option-test.conf";
     private static ReadonlyConfig config;
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
index 105b83078..e597ddc1d 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConditionTest.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.configuration.OptionTest;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-@SuppressWarnings("MagicNumber")
 public class ConditionTest {
     private static final Condition<OptionTest.TestMode> TEST_CONDITION = 
Condition.of(TEST_MODE, OptionTest.TestMode.EARLIEST)
         .or(TEST_MODE, OptionTest.TestMode.LATEST)
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
new file mode 100644
index 000000000..54ba7df79
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.configuration.util;
+
+import static org.apache.seatunnel.api.configuration.OptionTest.TEST_MODE;
+import static 
org.apache.seatunnel.api.configuration.util.OptionRuleTest.TEST_PORTS;
+import static 
org.apache.seatunnel.api.configuration.util.OptionRuleTest.TEST_TIMESTAMP;
+import static 
org.apache.seatunnel.api.configuration.util.OptionRuleTest.TEST_TOPIC;
+import static 
org.apache.seatunnel.api.configuration.util.OptionRuleTest.TEST_TOPIC_PATTERN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.OptionTest;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class ConfigValidatorTest {
+    public static final Option<String> KEY_USERNAME =
+        Options.key("username")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("username of the Neo4j");
+
+    public static final Option<String> KEY_PASSWORD =
+        Options.key("password")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("password of the Neo4j");
+
+    public static final Option<String> KEY_BEARER_TOKEN =
+        Options.key("bearer-token")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("base64 encoded bearer token of the Neo4j. for 
Auth.");
+
+    public static final Option<String> KEY_KERBEROS_TICKET =
+        Options.key("kerberos-ticket")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("base64 encoded kerberos ticket of the Neo4j. for 
Auth.");
+
+    void validate(Map<String, Object> config, OptionRule rule) {
+        ConfigValidator.of(ReadonlyConfig.fromMap(config)).validate(rule);
+    }
+
+    @Test
+    public void testAbsolutelyRequiredOption() {
+        OptionRule rule = OptionRule.builder()
+            .required(TEST_PORTS, KEY_USERNAME, KEY_PASSWORD)
+            .build();
+        Map<String, Object> config = new HashMap<>();
+        Executable executable = () -> validate(config, rule);
+
+        // absent
+        config.put(TEST_PORTS.key(), "[9090]");
+        assertEquals("There are unconfigured options, the options('password', 
'username') are required.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        config.put(KEY_USERNAME.key(), "asuka");
+        assertEquals("There are unconfigured options, the options('password') 
are required.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // all present
+        config.put(KEY_PASSWORD.key(), "saitou");
+        Assertions.assertDoesNotThrow(executable);
+    }
+
+    @Test
+    public void testBundledRequiredOptions() {
+        OptionRule rule = OptionRule.builder()
+            .bundled(KEY_USERNAME, KEY_PASSWORD).build();
+        Map<String, Object> config = new HashMap<>();
+        Executable executable = () -> validate(config, rule);
+
+        // case1: all absent
+        Assertions.assertDoesNotThrow(executable);
+
+        // case2: some present
+        config.put(KEY_USERNAME.key(), "asuka");
+        assertEquals("These options('password', 'username') are bundled, must 
be present or absent together." +
+                " The options present are: 'username'. The options absent are 
'password'.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // case2: all present
+        config.put(KEY_PASSWORD.key(), "saitou");
+        Assertions.assertDoesNotThrow(executable);
+    }
+
+    @Test
+    public void testSimpleExclusiveRequiredOptions() {
+        OptionRule rule = OptionRule.builder()
+            .exclusive(TEST_TOPIC_PATTERN, TEST_TOPIC)
+            .build();
+        Map<String, Object> config = new HashMap<>();
+        Executable executable = () -> validate(config, rule);
+
+        // all absent
+        assertEquals("There are unconfigured options, these 
options(['option.topic-pattern'], ['option.topic']) are mutually exclusive," +
+                " allowing only one set(\"[] for a set\") of options to be 
configured.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // only one present
+        config.put(TEST_TOPIC_PATTERN.key(), "asuka");
+        Assertions.assertDoesNotThrow(executable);
+
+        // present > 1
+        config.put(TEST_TOPIC.key(), "[\"saitou\"]");
+        assertEquals("These options(['option.topic-pattern'], 
['option.topic']) are mutually exclusive, " +
+                "allowing only one set(\"[] for a set\") of options to be 
configured.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+    }
+
+    @Test
+    public void testComplexExclusiveRequiredOptions() {
+        Set<RequiredOption.BundledRequiredOptions> exclusiveBundledOptions  = 
new HashSet<>();
+        
exclusiveBundledOptions.add(RequiredOption.BundledRequiredOptions.of(KEY_USERNAME,
 KEY_PASSWORD));
+        OptionRule rule = OptionRule.builder()
+            
.exclusive(RequiredOption.ExclusiveRequiredOptions.of(exclusiveBundledOptions, 
KEY_BEARER_TOKEN, KEY_KERBEROS_TICKET))
+            .build();
+
+        Map<String, Object> config = new HashMap<>();
+        Executable executable = () -> validate(config, rule);
+
+        // all absent
+        assertEquals("There are unconfigured options, these 
options(['kerberos-ticket'], ['password', 'username'], ['bearer-token']) are 
mutually exclusive," +
+                " allowing only one set(\"[] for a set\") of options to be 
configured.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // bundled option some present
+        config.put(KEY_USERNAME.key(), "asuka");
+        assertEquals("These options('password', 'username') are bundled, must 
be present or absent together." +
+                " The options present are: 'username'. The options absent are 
'password'.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // only one set options present
+        config.put(KEY_PASSWORD.key(), "saitou");
+        Assertions.assertDoesNotThrow(executable);
+
+        // tow set options present
+        config.put(KEY_BEARER_TOKEN.key(), "ashulin");
+        assertEquals("These options(['password', 'username'], 
['bearer-token']) are mutually exclusive," +
+                " allowing only one set(\"[] for a set\") of options to be 
configured.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // three set options present
+        config.put(KEY_KERBEROS_TICKET.key(), "zongwen");
+        assertEquals("These options(['kerberos-ticket'], ['password', 
'username'], ['bearer-token']) are mutually exclusive," +
+                " allowing only one set(\"[] for a set\") of options to be 
configured.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+    }
+
+    @Test
+    public void testSimpleConditionalRequiredOptionsWithDefaultValue() {
+        OptionRule rule = OptionRule.builder()
+            .conditional(TEST_MODE, OptionTest.TestMode.TIMESTAMP, 
TEST_TIMESTAMP)
+            .build();
+        Map<String, Object> config = new HashMap<>();
+        Executable executable = () -> validate(config, rule);
+
+        // Expression mismatch
+        Assertions.assertDoesNotThrow(executable);
+
+        // Expression match, and required options absent
+        config.put(TEST_MODE.key(), "timestamp");
+        assertEquals("There are unconfigured options, the 
options('option.timestamp') are required" +
+                " because ['option.mode' == TIMESTAMP] is true.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // Expression match, and required options all present
+        config.put(TEST_TIMESTAMP.key(), "564231238596789");
+        Assertions.assertDoesNotThrow(executable);
+
+        // Expression mismatch
+        config.put(TEST_MODE.key(), "EARLIEST");
+        Assertions.assertDoesNotThrow(executable);
+    }
+
+    @Test
+    public void testSimpleConditionalRequiredOptionsWithoutDefaultValue() {
+        OptionRule rule = OptionRule.builder()
+            .conditional(KEY_USERNAME, "ashulin", TEST_TIMESTAMP)
+            .build();
+        Map<String, Object> config = new HashMap<>();
+        Executable executable = () -> validate(config, rule);
+
+        // Expression mismatch
+        Assertions.assertDoesNotThrow(executable);
+
+        // Expression match, and required options absent
+        config.put(KEY_USERNAME.key(), "ashulin");
+        assertEquals("There are unconfigured options, the 
options('option.timestamp') are required" +
+                " because ['username' == ashulin] is true.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // Expression match, and required options all present
+        config.put(TEST_TIMESTAMP.key(), "564231238596789");
+        Assertions.assertDoesNotThrow(executable);
+
+        // Expression mismatch
+        config.put(KEY_USERNAME.key(), "asuka");
+        Assertions.assertDoesNotThrow(executable);
+    }
+
+    @Test
+    public void testComplexConditionalRequiredOptions() {
+        Expression expression = Expression.of(KEY_USERNAME, "ashulin")
+            .or(Expression.of(Condition.of(KEY_USERNAME, "asuka")
+                .and(KEY_PASSWORD, "saito")));
+        OptionRule rule = OptionRule.builder()
+            .conditional(expression, TEST_TIMESTAMP)
+            .build();
+        Map<String, Object> config = new HashMap<>();
+        Executable executable = () -> validate(config, rule);
+
+        // Expression mismatch
+        Assertions.assertDoesNotThrow(executable);
+
+        // 'username' == ashulin, and required options absent
+        config.put(KEY_USERNAME.key(), "ashulin");
+        assertEquals("There are unconfigured options, the 
options('option.timestamp') are required" +
+                " because ['username' == ashulin || ('username' == asuka && 
'password' == saito)] is true.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+
+        // Expression match, and required options all present
+        config.put(TEST_TIMESTAMP.key(), "564231238596789");
+        Assertions.assertDoesNotThrow(executable);
+
+        // Expression mismatch
+        config.put(KEY_USERNAME.key(), "asuka");
+        Assertions.assertDoesNotThrow(executable);
+
+        // 'username' == asuka && 'password' == saito
+        config.put(KEY_PASSWORD.key(), "saito");
+        Assertions.assertDoesNotThrow(executable);
+
+        // 'username' == asuka && 'password' == saito, and required options 
absent
+        config.remove(TEST_TIMESTAMP.key());
+        assertEquals("There are unconfigured options, the 
options('option.timestamp') are required" +
+                " because ['username' == ashulin || ('username' == asuka && 
'password' == saito)] is true.",
+            assertThrows(OptionValidationException.class, 
executable).getMessage());
+    }
+}
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
index 5d4416b47..e7971a1a3 100644
--- 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ExpressionTest.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.configuration.OptionTest;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-@SuppressWarnings("MagicNumber")
 public class ExpressionTest {
     @Test
     public void testToString() {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
index 01fc05ece..ca67b7056 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
@@ -45,6 +45,6 @@ public class ClickhouseSinkFactory implements 
TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder().required(HOST, DATABASE, TABLE)
             .optional(CLICKHOUSE_PREFIX, BULK_SIZE, SPLIT_MODE, FIELDS, 
SHARDING_KEY)
-            .bundledRequired(USERNAME, PASSWORD).build();
+            .bundled(USERNAME, PASSWORD).build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
index c26dce24c..1a20713e4 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -38,7 +38,7 @@ public class RedisSourceFactory implements TableSourceFactory 
{
             .required(RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, 
RedisConfig.DATA_TYPE)
             .optional(RedisConfig.HASH_KEY_PARSE_MODE, RedisConfig.AUTH, 
RedisConfig.USER, RedisConfig.KEY_PATTERN)
             .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
-            .bundledRequired(RedisConfig.FORMAT, SeaTunnelSchema.SCHEMA)
+            .bundled(RedisConfig.FORMAT, SeaTunnelSchema.SCHEMA)
             .build();
     }
 }


Reply via email to