wenjin272 commented on code in PR #821:
URL: https://github.com/apache/flink-agents/pull/821#discussion_r3515265226


##########
runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.condition;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import dev.cel.runtime.CelEvaluationException;
+import dev.cel.runtime.CelRuntime;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy;
+import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Evaluates CEL condition expressions against event data. */
+public class CelConditionEvaluator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CelConditionEvaluator.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /** Frozen after {@link #initPrograms}; cleared by {@link #close}. */
+    @Nullable private Map<String, CelRuntime.Program> programCache;
+
+    private final CelEvaluationFailurePolicy failurePolicy;
+
+    public CelConditionEvaluator() {
+        this(CelEvaluationFailurePolicy.WARN_AND_SKIP);
+    }
+
+    public CelConditionEvaluator(CelEvaluationFailurePolicy failurePolicy) {
+        this.failurePolicy = failurePolicy;
+    }
+
+    /** Pre-compiles {@code expressions} and freezes the cache. Nulls are 
skipped. */
+    public void initPrograms(Collection<CelExpression> expressions) {
+        Map<String, CelRuntime.Program> programs = new HashMap<>();
+        for (CelExpression expression : expressions) {
+            if (expression == null) {
+                continue;
+            }
+            String source = expression.source();
+            programs.computeIfAbsent(source, CelExpressionFacade::toProgram);
+        }
+        this.programCache = Collections.unmodifiableMap(programs);
+    }
+
+    public void close() {
+        programCache = null;
+    }
+
+    /** Evaluates {@code expression} (which must have been pre-compiled). Null 
returns true. */
+    public boolean evaluate(@Nullable CelExpression expression, Map<String, 
Object> activation) {
+        if (expression == null) {
+            return true;
+        }
+        String source = expression.source();
+        try {
+            CelRuntime.Program program = programCache.get(source);
+            if (program == null) {
+                throw new IllegalStateException(
+                        "CEL condition was not pre-compiled via 
initPrograms(): \""
+                                + source
+                                + "\"");
+            }
+            return evaluateProgram(source, program, activation);
+        } catch (CelEvaluationException e) {
+            if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+                throw new IllegalStateException(
+                        "CEL condition evaluation failed for '" + source + 
"'", e);
+            }
+            LOG.warn("CEL condition evaluation failed for '{}', skipping 
action", source, e);
+            return false;
+        }
+    }
+
+    private boolean evaluateProgram(
+            String condition, CelRuntime.Program program, Map<String, Object> 
activation)
+            throws CelEvaluationException {
+        Object result = program.eval(activation);
+        if (result instanceof Boolean) {
+            return (Boolean) result;
+        }
+        String msg =
+                String.format(
+                        "CEL condition '%s' returned non-boolean type %s, 
treating as false",
+                        condition, result == null ? "null" : 
result.getClass().getName());
+        if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+            throw new IllegalStateException(msg);
+        }
+        LOG.warn(msg);
+        return false;
+    }
+
+    /**
+     * Builds the CEL activation. Contract (mirror of Python {@code 
cel_facade}):
+     *
+     * <ul>
+     *   <li>{@code type} and {@code EventType} are framework-owned and always 
win.
+     *   <li>{@code attributes} holds the single-level merge of user data: 
{@code output.*} subkeys,
+     *       then root attribute fields, then {@code input.*} subkeys ({@code 
output > root > input}
+     *       on collision, via {@link Map#putIfAbsent}). Only one level is 
flattened — nested fields
+     *       stay nested ({@code mylist.name}, not {@code name}).
+     *   <li>Every merged attribute is also promoted to the activation top 
level, so conditions can
+     *       use bare identifiers ({@code score > 0.8}) without any AST 
rewriting. Framework keys
+     *       are never shadowed.
+     *   <li>{@code id} is the user-supplied {@code id} attribute when 
present, otherwise falls back
+     *       to the event UUID.
+     * </ul>
+     *
+     * <p>JSON-shaped strings auto-parse first; narrow numerics widen to 
long/double.
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> createActivation(Event event) {
+        Map<String, Object> activation = new HashMap<>();
+        activation.put("type", event.getType());
+        activation.put("EventType", CelExpressionFacade.EVENT_TYPE_CONSTANTS);
+
+        Object normalizedAttrs = normalizeValue(event.getAttributes(), 0);
+        Map<String, Object> merged = new HashMap<>();
+        if (normalizedAttrs instanceof Map) {
+            Map<String, Object> attrs = (Map<String, Object>) normalizedAttrs;
+
+            // Precedence: output subkeys > root attributes > input subkeys 
(putIfAbsent keeps the
+            // earliest insertion). Root iteration includes the 
"input"/"output" maps themselves,
+            // so nested paths like input.region.width keep working.
+            Object outputObj = attrs.get("output");
+            if (outputObj instanceof Map) {
+                ((Map<String, Object>) outputObj).forEach(merged::putIfAbsent);
+            }
+            attrs.forEach(merged::putIfAbsent);
+            Object inputObj = attrs.get("input");
+            if (inputObj instanceof Map) {
+                ((Map<String, Object>) inputObj).forEach(merged::putIfAbsent);
+            }
+        }
+
+        activation.put("attributes", merged);
+        // Promote to top level for bare-identifier access; framework keys win 
on collision.
+        merged.forEach(activation::putIfAbsent);
+        // Event UUID only as fallback — a user-supplied id attribute takes 
precedence.
+        activation.putIfAbsent("id", event.getId().toString());
+
+        return activation;
+    }
+
+    /**
+     * Maximum recursion depth for {@link #normalizeValue}. Past this depth, 
strings are kept as
+     * plain strings rather than parsed as JSON (graceful degrade, mirror of 
Python {@code
+     * _MAX_NORMALIZE_DEPTH}). Prevents stack blow-up on adversarial nested 
JSON input.
+     */
+    static final int MAX_NORMALIZE_DEPTH = 16;
+
+    /** JSON-looking strings → Map/List; narrow numerics widened to 
long/double for CEL. */
+    @SuppressWarnings("unchecked")
+    private static Object normalizeValue(Object value, int depth) {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof String) {
+            // Depth cap reached — keep the raw string (mirrors Python's 
_MAX_NORMALIZE_DEPTH).
+            if (depth >= MAX_NORMALIZE_DEPTH) {
+                return value;
+            }
+            String s = ((String) value).trim();
+            if (s.length() >= 2
+                    && ((s.charAt(0) == '{' && s.charAt(s.length() - 1) == '}')
+                            || (s.charAt(0) == '[' && s.charAt(s.length() - 1) 
== ']'))) {
+                try {
+                    return normalizeValue(MAPPER.readValue(s, Object.class), 
depth + 1);
+                } catch (Exception ignored) {
+                    // Not valid JSON — fall through as plain string.
+                }
+            }
+            return value;
+        }
+        if (value instanceof Map) {
+            Map<String, Object> src = (Map<String, Object>) value;
+            Map<String, Object> dst = new HashMap<>(src.size());
+            for (Map.Entry<String, Object> entry : src.entrySet()) {
+                dst.put(entry.getKey(), normalizeValue(entry.getValue(), depth 
+ 1));
+            }
+            return dst;
+        }
+        if (value instanceof List) {
+            List<Object> src = (List<Object>) value;
+            List<Object> dst = new ArrayList<>(src.size());
+            for (Object item : src) {
+                dst.add(normalizeValue(item, depth + 1));
+            }
+            return dst;
+        }
+        if (value instanceof Byte || value instanceof Short || value 
instanceof Integer) {
+            return ((Number) value).longValue();
+        }
+        if (value instanceof Float) {
+            return ((Float) value).doubleValue();
+        }
+        if (value instanceof BigInteger) {
+            BigInteger bigInt = (BigInteger) value;
+            if (bigInt.bitLength() < 64) {
+                return bigInt.longValue();
+            }
+            throw new IllegalArgumentException(
+                    "CEL normalizeValue: BigInteger value overflows int64: " + 
bigInt);

Review Comment:
   An `IllegalArgumentException` is thrown here, but it is not caught anywhere. 
This behavior is inconsistent with the advertised functionality of 
`WARN_AND_SKIP`.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.condition;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import dev.cel.runtime.CelEvaluationException;
+import dev.cel.runtime.CelRuntime;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy;
+import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Evaluates CEL condition expressions against event data. */
+public class CelConditionEvaluator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CelConditionEvaluator.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /** Frozen after {@link #initPrograms}; cleared by {@link #close}. */
+    @Nullable private Map<String, CelRuntime.Program> programCache;
+
+    private final CelEvaluationFailurePolicy failurePolicy;
+
+    public CelConditionEvaluator() {
+        this(CelEvaluationFailurePolicy.WARN_AND_SKIP);
+    }
+
+    public CelConditionEvaluator(CelEvaluationFailurePolicy failurePolicy) {
+        this.failurePolicy = failurePolicy;
+    }
+
+    /** Pre-compiles {@code expressions} and freezes the cache. Nulls are 
skipped. */
+    public void initPrograms(Collection<CelExpression> expressions) {
+        Map<String, CelRuntime.Program> programs = new HashMap<>();
+        for (CelExpression expression : expressions) {
+            if (expression == null) {
+                continue;
+            }
+            String source = expression.source();
+            programs.computeIfAbsent(source, CelExpressionFacade::toProgram);
+        }
+        this.programCache = Collections.unmodifiableMap(programs);
+    }
+
+    public void close() {
+        programCache = null;
+    }
+
+    /** Evaluates {@code expression} (which must have been pre-compiled). Null 
returns true. */
+    public boolean evaluate(@Nullable CelExpression expression, Map<String, 
Object> activation) {
+        if (expression == null) {
+            return true;
+        }
+        String source = expression.source();
+        try {
+            CelRuntime.Program program = programCache.get(source);
+            if (program == null) {
+                throw new IllegalStateException(
+                        "CEL condition was not pre-compiled via 
initPrograms(): \""
+                                + source
+                                + "\"");
+            }
+            return evaluateProgram(source, program, activation);
+        } catch (CelEvaluationException e) {
+            if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+                throw new IllegalStateException(
+                        "CEL condition evaluation failed for '" + source + 
"'", e);
+            }
+            LOG.warn("CEL condition evaluation failed for '{}', skipping 
action", source, e);
+            return false;
+        }
+    }
+
+    private boolean evaluateProgram(
+            String condition, CelRuntime.Program program, Map<String, Object> 
activation)
+            throws CelEvaluationException {
+        Object result = program.eval(activation);
+        if (result instanceof Boolean) {
+            return (Boolean) result;
+        }
+        String msg =
+                String.format(
+                        "CEL condition '%s' returned non-boolean type %s, 
treating as false",
+                        condition, result == null ? "null" : 
result.getClass().getName());
+        if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+            throw new IllegalStateException(msg);
+        }
+        LOG.warn(msg);
+        return false;
+    }
+
+    /**
+     * Builds the CEL activation. Contract (mirror of Python {@code 
cel_facade}):
+     *
+     * <ul>
+     *   <li>{@code type} and {@code EventType} are framework-owned and always 
win.
+     *   <li>{@code attributes} holds the single-level merge of user data: 
{@code output.*} subkeys,
+     *       then root attribute fields, then {@code input.*} subkeys ({@code 
output > root > input}
+     *       on collision, via {@link Map#putIfAbsent}). Only one level is 
flattened — nested fields
+     *       stay nested ({@code mylist.name}, not {@code name}).
+     *   <li>Every merged attribute is also promoted to the activation top 
level, so conditions can
+     *       use bare identifiers ({@code score > 0.8}) without any AST 
rewriting. Framework keys
+     *       are never shadowed.
+     *   <li>{@code id} is the user-supplied {@code id} attribute when 
present, otherwise falls back
+     *       to the event UUID.
+     * </ul>
+     *
+     * <p>JSON-shaped strings auto-parse first; narrow numerics widen to 
long/double.
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> createActivation(Event event) {
+        Map<String, Object> activation = new HashMap<>();
+        activation.put("type", event.getType());
+        activation.put("EventType", CelExpressionFacade.EVENT_TYPE_CONSTANTS);
+
+        Object normalizedAttrs = normalizeValue(event.getAttributes(), 0);

Review Comment:
   All fields are included in the normalization process. Is it not possible to 
normalize only the attributes involved in conditional calculations here?



##########
dist/src/main/resources/META-INF/NOTICE:
##########
@@ -14,6 +14,8 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.2
 - com.fasterxml.jackson.module:jackson-module-kotlin:2.18.2
 - com.fasterxml:classmate:1.7.0
+- dev.cel:cel:0.12.0
+- dev.cel:protobuf:0.12.0

Review Comment:
   dev.cel:common:0.12.0
   dev.cel:compiler:0.12.0
   dev.cel:runtime:0.12.0
   dev.cel:v1alpha1:0.12.0



##########
plan/src/main/java/org/apache/flink/agents/plan/condition/ParsedCondition.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.plan.condition;
+
+import dev.cel.common.CelAbstractSyntaxTree;
+import dev.cel.common.CelOptions;
+import dev.cel.common.CelValidationException;
+import dev.cel.common.ast.CelExpr;
+import dev.cel.parser.CelParser;
+import dev.cel.parser.CelParserFactory;
+
+import java.util.Objects;
+
+/**
+ * A parsed {@code Action.triggerConditions} entry — either {@link TypeMatch} 
or {@link
+ * CelExpression}. {@link #classify} turns a raw entry string into one of the 
two.
+ */
+public interface ParsedCondition {
+
+    /** Original user-written entry string. */
+    String source();
+
+    /** Mirrors the runtime facade caps so a too-deep / too-long expression 
fails at classify. */
+    CelOptions CEL_OPTIONS =
+            CelOptions.current()
+                    .maxParseRecursionDepth(32)
+                    .maxExpressionCodePointSize(8_192)
+                    .build();
+
+    /**
+     * Parser with the custom {@code has()} macro and the same resource caps 
as the runtime facade
+     * parser.
+     */
+    CelParser CEL_PARSER =
+            CelParserFactory.standardCelParserBuilder()
+                    .setOptions(CEL_OPTIONS)
+                    .addMacros(CelMacroPolicy.HAS)
+                    .build();
+
+    /**
+     * Parses a {@code triggerConditions} entry: a non-reserved 
bare-identifier root becomes a
+     * {@link TypeMatch}, everything else a {@link CelExpression}.
+     */
+    static ParsedCondition classify(String source) {
+        if (source == null || source.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "ParsedCondition.classify: source must be non-null and 
non-empty");
+        }
+        CelAbstractSyntaxTree ast;
+        try {
+            ast = CEL_PARSER.parse(source).getAst();
+        } catch (CelValidationException e) {
+            throw new IllegalArgumentException(
+                    "Invalid CEL expression: \"" + source + "\" — " + 
e.getMessage(), e);
+        }
+        CelExpr root = ast.getExpr();
+        if (root.exprKind().getKind() == CelExpr.ExprKind.Kind.IDENT) {
+            String name = root.ident().name();
+            if (CelMacroPolicy.RESERVED_IDENTIFIERS.contains(name)) {
+                throw new IllegalArgumentException(
+                        "'"
+                                + name
+                                + "' is a CEL reserved keyword and cannot be 
used as an "
+                                + "event type name. Did you mean: @action(\""
+                                + name
+                                + " == 'xxx'\") or @action(\"attributes."
+                                + name
+                                + "\")?");
+            }
+            return new TypeMatch(name);

Review Comment:
   The current classifier effectively restricts direct event-type triggers to 
CEL identifiers. That may be an acceptable beta API constraint, but it should 
be made explicit rather
     than surfacing as a CEL parse side effect.
   
     For example, `@Action("com.example.OrderEvent")` or 
`@Action("order-created")` used to look like natural event-type strings, but 
now they are parsed as condition expressions /
     invalid CEL instead of being accepted as direct type triggers. If direct 
event-type triggers are intended to share the same string slot as condition 
expressions, please document
     that event type names used in `@Action(...)` must be valid condition 
identifiers and cannot contain `.`, `-`, `:`, etc. It would also be better to 
validate this explicitly at plan
     construction time and return a trigger-condition error message, instead of 
an `Invalid CEL expression` error.



##########
api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java:
##########
@@ -33,6 +33,17 @@ public class AgentConfigOptions {
     public static final ConfigOption<LoggerType> EVENT_LOGGER_TYPE =
             new ConfigOption<>("eventLoggerType", LoggerType.class, 
LoggerType.SLF4J);
 
+    /**
+     * Controls how the CEL condition evaluator handles runtime exceptions and 
non-Boolean results.
+     * Defaults to {@link CelEvaluationFailurePolicy#WARN_AND_SKIP} for 
streaming safety; set to
+     * {@link CelEvaluationFailurePolicy#FAIL} on strict-semantics pipelines 
to trigger failover.
+     */
+    public static final ConfigOption<CelEvaluationFailurePolicy> 
CEL_EVALUATION_FAILURE_POLICY =
+            new ConfigOption<>(
+                    "celEvaluationFailurePolicy",

Review Comment:
   I believe users cannot fully understand the purpose of this configuration 
item from its name. CEL is an implementation-level detail, and API-layer users 
do not need to understand how conditions are evaluated. It could be renamed to 
`action.trigger-condition.evaluate-failure-tolerance-strategy` or some one like 
this.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.condition;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import dev.cel.runtime.CelEvaluationException;
+import dev.cel.runtime.CelRuntime;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy;
+import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Evaluates CEL condition expressions against event data. */
+public class CelConditionEvaluator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CelConditionEvaluator.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /** Frozen after {@link #initPrograms}; cleared by {@link #close}. */
+    @Nullable private Map<String, CelRuntime.Program> programCache;
+
+    private final CelEvaluationFailurePolicy failurePolicy;
+
+    public CelConditionEvaluator() {
+        this(CelEvaluationFailurePolicy.WARN_AND_SKIP);
+    }
+
+    public CelConditionEvaluator(CelEvaluationFailurePolicy failurePolicy) {
+        this.failurePolicy = failurePolicy;
+    }
+
+    /** Pre-compiles {@code expressions} and freezes the cache. Nulls are 
skipped. */
+    public void initPrograms(Collection<CelExpression> expressions) {
+        Map<String, CelRuntime.Program> programs = new HashMap<>();
+        for (CelExpression expression : expressions) {
+            if (expression == null) {
+                continue;
+            }
+            String source = expression.source();
+            programs.computeIfAbsent(source, CelExpressionFacade::toProgram);
+        }
+        this.programCache = Collections.unmodifiableMap(programs);
+    }
+
+    public void close() {
+        programCache = null;
+    }
+
+    /** Evaluates {@code expression} (which must have been pre-compiled). Null 
returns true. */
+    public boolean evaluate(@Nullable CelExpression expression, Map<String, 
Object> activation) {
+        if (expression == null) {
+            return true;
+        }
+        String source = expression.source();
+        try {
+            CelRuntime.Program program = programCache.get(source);
+            if (program == null) {
+                throw new IllegalStateException(
+                        "CEL condition was not pre-compiled via 
initPrograms(): \""
+                                + source
+                                + "\"");
+            }
+            return evaluateProgram(source, program, activation);
+        } catch (CelEvaluationException e) {
+            if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+                throw new IllegalStateException(
+                        "CEL condition evaluation failed for '" + source + 
"'", e);
+            }
+            LOG.warn("CEL condition evaluation failed for '{}', skipping 
action", source, e);
+            return false;
+        }
+    }
+
+    private boolean evaluateProgram(
+            String condition, CelRuntime.Program program, Map<String, Object> 
activation)
+            throws CelEvaluationException {
+        Object result = program.eval(activation);
+        if (result instanceof Boolean) {
+            return (Boolean) result;
+        }
+        String msg =
+                String.format(
+                        "CEL condition '%s' returned non-boolean type %s, 
treating as false",
+                        condition, result == null ? "null" : 
result.getClass().getName());
+        if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+            throw new IllegalStateException(msg);
+        }
+        LOG.warn(msg);
+        return false;
+    }
+
+    /**
+     * Builds the CEL activation. Contract (mirror of Python {@code 
cel_facade}):
+     *
+     * <ul>
+     *   <li>{@code type} and {@code EventType} are framework-owned and always 
win.
+     *   <li>{@code attributes} holds the single-level merge of user data: 
{@code output.*} subkeys,
+     *       then root attribute fields, then {@code input.*} subkeys ({@code 
output > root > input}
+     *       on collision, via {@link Map#putIfAbsent}). Only one level is 
flattened — nested fields
+     *       stay nested ({@code mylist.name}, not {@code name}).
+     *   <li>Every merged attribute is also promoted to the activation top 
level, so conditions can
+     *       use bare identifiers ({@code score > 0.8}) without any AST 
rewriting. Framework keys
+     *       are never shadowed.
+     *   <li>{@code id} is the user-supplied {@code id} attribute when 
present, otherwise falls back
+     *       to the event UUID.
+     * </ul>
+     *
+     * <p>JSON-shaped strings auto-parse first; narrow numerics widen to 
long/double.
+     */
+    @SuppressWarnings("unchecked")

Review Comment:
   The term `CEL Activation` requires other developers to understand CEL, which 
is unnecessary.
   
   ```
   /**
      * Builds the variable map used to evaluate a trigger condition for one 
event.
      *
      * <p>The returned map is passed to the condition expression engine. It 
always contains:
      *
      * <ul>
      *   <li>{@code type}: the event routing type, from {@link 
Event#getType()}.
      *   <li>{@code id}: the user-provided {@code attributes.id} when present; 
otherwise the event UUID.
      *   <li>{@code attributes}: normalized event attributes.
      *   <li>{@code EventType}: built-in event type constants, e.g. {@code 
EventType.InputEvent}.
      * </ul>
      *
      * <p>For convenience, event attributes are also exposed as top-level 
variables. For example,
      * an event attribute {@code score} can be referenced as either {@code 
score} or
      * {@code attributes.score}.
      *
      * <p>If the event contains nested {@code input} or {@code output} maps, 
their immediate keys are
      * also exposed as top-level attributes. When the same key appears in 
multiple places, precedence is:
      * {@code output} keys first, then root attributes, then {@code input} 
keys.
      *
      * <p>String values that look like JSON objects or arrays are parsed 
before evaluation, and narrow
      * Java numeric types are widened to {@code long} or {@code double}.
      */
   ```



##########
runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelExpressionFacade.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.condition;
+
+import dev.cel.common.CelAbstractSyntaxTree;
+import dev.cel.common.CelOptions;
+import dev.cel.common.CelValidationException;
+import dev.cel.common.ast.CelExpr;
+import dev.cel.common.navigation.CelNavigableAst;
+import dev.cel.common.types.CelType;
+import dev.cel.common.types.MapType;
+import dev.cel.common.types.SimpleType;
+import dev.cel.compiler.CelCompiler;
+import dev.cel.compiler.CelCompilerBuilder;
+import dev.cel.compiler.CelCompilerFactory;
+import dev.cel.parser.CelParser;
+import dev.cel.parser.CelParserFactory;
+import dev.cel.runtime.CelEvaluationException;
+import dev.cel.runtime.CelRuntime;
+import dev.cel.runtime.CelRuntimeFactory;
+import org.apache.flink.agents.api.EventType;
+import org.apache.flink.agents.plan.condition.CelMacroPolicy;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * CEL Parse → Validate → Check → Program pipeline. Compiled {@link 
CelRuntime.Program} instances
+ * are cached process-wide by source string.
+ */
+public final class CelExpressionFacade {

Review Comment:
   I suggest renaming it to `ConditionExpressionCompiler`, as the term "Facade" 
is rarely used and its meaning is unclear.
   
   I also suggest changing the Javadoc to: `Compiles trigger condition 
expressions into cached executable programs`



##########
api/src/main/java/org/apache/flink/agents/api/configuration/CelEvaluationFailurePolicy.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.configuration;
+
+/** Behaviour when a CEL trigger condition throws or returns a non-Boolean at 
evaluation time. */
+public enum CelEvaluationFailurePolicy {

Review Comment:
   I feel it's unnecessary to have a separate file, simply implement it as an 
inner class of `AgentConfigOptions`.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.condition;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import dev.cel.runtime.CelEvaluationException;
+import dev.cel.runtime.CelRuntime;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy;
+import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Evaluates CEL condition expressions against event data. */
+public class CelConditionEvaluator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CelConditionEvaluator.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /** Frozen after {@link #initPrograms}; cleared by {@link #close}. */
+    @Nullable private Map<String, CelRuntime.Program> programCache;
+
+    private final CelEvaluationFailurePolicy failurePolicy;
+
+    public CelConditionEvaluator() {
+        this(CelEvaluationFailurePolicy.WARN_AND_SKIP);
+    }
+
+    public CelConditionEvaluator(CelEvaluationFailurePolicy failurePolicy) {
+        this.failurePolicy = failurePolicy;
+    }
+
+    /** Pre-compiles {@code expressions} and freezes the cache. Nulls are 
skipped. */
+    public void initPrograms(Collection<CelExpression> expressions) {
+        Map<String, CelRuntime.Program> programs = new HashMap<>();
+        for (CelExpression expression : expressions) {
+            if (expression == null) {
+                continue;
+            }
+            String source = expression.source();
+            programs.computeIfAbsent(source, CelExpressionFacade::toProgram);
+        }
+        this.programCache = Collections.unmodifiableMap(programs);
+    }
+
+    public void close() {
+        programCache = null;
+    }
+
+    /** Evaluates {@code expression} (which must have been pre-compiled). Null 
returns true. */
+    public boolean evaluate(@Nullable CelExpression expression, Map<String, 
Object> activation) {
+        if (expression == null) {
+            return true;
+        }
+        String source = expression.source();
+        try {
+            CelRuntime.Program program = programCache.get(source);
+            if (program == null) {
+                throw new IllegalStateException(
+                        "CEL condition was not pre-compiled via 
initPrograms(): \""
+                                + source
+                                + "\"");
+            }
+            return evaluateProgram(source, program, activation);
+        } catch (CelEvaluationException e) {
+            if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+                throw new IllegalStateException(
+                        "CEL condition evaluation failed for '" + source + 
"'", e);
+            }
+            LOG.warn("CEL condition evaluation failed for '{}', skipping 
action", source, e);
+            return false;
+        }
+    }
+
+    private boolean evaluateProgram(
+            String condition, CelRuntime.Program program, Map<String, Object> 
activation)
+            throws CelEvaluationException {
+        Object result = program.eval(activation);
+        if (result instanceof Boolean) {
+            return (Boolean) result;
+        }
+        String msg =
+                String.format(
+                        "CEL condition '%s' returned non-boolean type %s, 
treating as false",
+                        condition, result == null ? "null" : 
result.getClass().getName());
+        if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+            throw new IllegalStateException(msg);
+        }
+        LOG.warn(msg);
+        return false;
+    }
+
+    /**
+     * Builds the CEL activation. Contract (mirror of Python {@code 
cel_facade}):
+     *
+     * <ul>
+     *   <li>{@code type} and {@code EventType} are framework-owned and always 
win.
+     *   <li>{@code attributes} holds the single-level merge of user data: 
{@code output.*} subkeys,
+     *       then root attribute fields, then {@code input.*} subkeys ({@code 
output > root > input}
+     *       on collision, via {@link Map#putIfAbsent}). Only one level is 
flattened — nested fields
+     *       stay nested ({@code mylist.name}, not {@code name}).
+     *   <li>Every merged attribute is also promoted to the activation top 
level, so conditions can
+     *       use bare identifiers ({@code score > 0.8}) without any AST 
rewriting. Framework keys
+     *       are never shadowed.
+     *   <li>{@code id} is the user-supplied {@code id} attribute when 
present, otherwise falls back
+     *       to the event UUID.
+     * </ul>
+     *
+     * <p>JSON-shaped strings auto-parse first; narrow numerics widen to 
long/double.
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> createActivation(Event event) {

Review Comment:
   `Activation` is a concept specific to CEL. People unfamiliar with CEL may 
not understand the purpose of this function based on its name alone. I suggest 
renaming it to `mapEventToTriggerConditionVariables` or something like this.
   
   Our naming and Javadoc directly expose `CEL` in many places. I believe that 
even in modules primarily intended for other developers, such as the 
plan/runtime module, we should strive to encapsulate CEL concepts within the 
expression evaluation internals. For example, rename `CelConditionEvaluator` to 
`ConditionEvaluator`, and `ActionWithCels` to `ActionWithConditions`.



##########
plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java:
##########
@@ -45,6 +50,12 @@ public class Action {
     private final Function exec;
     private final List<String> triggerConditions;
 
+    /**
+     * Transient cache of classified {@link #triggerConditions}: CEL AST isn't 
Kryo-serialisable, so
+     * it is rebuilt lazily after deserialization via {@link 
#parsedConditions()}.
+     */
+    private transient List<ParsedCondition> parsedConditions;

Review Comment:
   It appears that `parsedConditions` is designed to distinguish whether each 
condition is a `TypeMatch` or a `CelExpression`. The cache is stored within the 
`Action` to avoid repeated runtime checks.
   1. I suggest declaring this field as `final`.
   2. The current Javadoc reflects an earlier implementation where the CEL AST 
was stored. However, due to Kryo serialization issues, this was changed to 
store a String instead, rendering the documentation outdated. In my experience, 
such comments are often auto-generated by coding agents. It is unnecessary to 
document development pitfalls like Kryo serialization problems; instead, please 
simply describe the field’s role in the final solution.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/condition/CelConditionEvaluator.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.condition;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import dev.cel.runtime.CelEvaluationException;
+import dev.cel.runtime.CelRuntime;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.configuration.CelEvaluationFailurePolicy;
+import org.apache.flink.agents.plan.condition.ParsedCondition.CelExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Evaluates CEL condition expressions against event data. */
+public class CelConditionEvaluator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CelConditionEvaluator.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /** Frozen after {@link #initPrograms}; cleared by {@link #close}. */
+    @Nullable private Map<String, CelRuntime.Program> programCache;
+
+    private final CelEvaluationFailurePolicy failurePolicy;
+
+    public CelConditionEvaluator() {
+        this(CelEvaluationFailurePolicy.WARN_AND_SKIP);
+    }
+
+    public CelConditionEvaluator(CelEvaluationFailurePolicy failurePolicy) {
+        this.failurePolicy = failurePolicy;
+    }
+
+    /** Pre-compiles {@code expressions} and freezes the cache. Nulls are 
skipped. */
+    public void initPrograms(Collection<CelExpression> expressions) {
+        Map<String, CelRuntime.Program> programs = new HashMap<>();
+        for (CelExpression expression : expressions) {
+            if (expression == null) {
+                continue;
+            }
+            String source = expression.source();
+            programs.computeIfAbsent(source, CelExpressionFacade::toProgram);
+        }
+        this.programCache = Collections.unmodifiableMap(programs);
+    }
+
+    public void close() {
+        programCache = null;
+    }
+
+    /** Evaluates {@code expression} (which must have been pre-compiled). Null 
returns true. */
+    public boolean evaluate(@Nullable CelExpression expression, Map<String, 
Object> activation) {
+        if (expression == null) {
+            return true;
+        }
+        String source = expression.source();
+        try {
+            CelRuntime.Program program = programCache.get(source);
+            if (program == null) {
+                throw new IllegalStateException(
+                        "CEL condition was not pre-compiled via 
initPrograms(): \""
+                                + source
+                                + "\"");
+            }
+            return evaluateProgram(source, program, activation);
+        } catch (CelEvaluationException e) {
+            if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+                throw new IllegalStateException(
+                        "CEL condition evaluation failed for '" + source + 
"'", e);
+            }
+            LOG.warn("CEL condition evaluation failed for '{}', skipping 
action", source, e);
+            return false;
+        }
+    }
+
+    private boolean evaluateProgram(
+            String condition, CelRuntime.Program program, Map<String, Object> 
activation)
+            throws CelEvaluationException {
+        Object result = program.eval(activation);
+        if (result instanceof Boolean) {
+            return (Boolean) result;
+        }
+        String msg =
+                String.format(
+                        "CEL condition '%s' returned non-boolean type %s, 
treating as false",
+                        condition, result == null ? "null" : 
result.getClass().getName());
+        if (failurePolicy == CelEvaluationFailurePolicy.FAIL) {
+            throw new IllegalStateException(msg);
+        }
+        LOG.warn(msg);
+        return false;
+    }
+
+    /**
+     * Builds the CEL activation. Contract (mirror of Python {@code 
cel_facade}):
+     *
+     * <ul>
+     *   <li>{@code type} and {@code EventType} are framework-owned and always 
win.
+     *   <li>{@code attributes} holds the single-level merge of user data: 
{@code output.*} subkeys,
+     *       then root attribute fields, then {@code input.*} subkeys ({@code 
output > root > input}
+     *       on collision, via {@link Map#putIfAbsent}). Only one level is 
flattened — nested fields
+     *       stay nested ({@code mylist.name}, not {@code name}).
+     *   <li>Every merged attribute is also promoted to the activation top 
level, so conditions can
+     *       use bare identifiers ({@code score > 0.8}) without any AST 
rewriting. Framework keys
+     *       are never shadowed.
+     *   <li>{@code id} is the user-supplied {@code id} attribute when 
present, otherwise falls back
+     *       to the event UUID.
+     * </ul>
+     *
+     * <p>JSON-shaped strings auto-parse first; narrow numerics widen to 
long/double.
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> createActivation(Event event) {
+        Map<String, Object> activation = new HashMap<>();
+        activation.put("type", event.getType());
+        activation.put("EventType", CelExpressionFacade.EVENT_TYPE_CONSTANTS);
+
+        Object normalizedAttrs = normalizeValue(event.getAttributes(), 0);
+        Map<String, Object> merged = new HashMap<>();
+        if (normalizedAttrs instanceof Map) {
+            Map<String, Object> attrs = (Map<String, Object>) normalizedAttrs;
+
+            // Precedence: output subkeys > root attributes > input subkeys 
(putIfAbsent keeps the
+            // earliest insertion). Root iteration includes the 
"input"/"output" maps themselves,
+            // so nested paths like input.region.width keep working.
+            Object outputObj = attrs.get("output");
+            if (outputObj instanceof Map) {
+                ((Map<String, Object>) outputObj).forEach(merged::putIfAbsent);
+            }
+            attrs.forEach(merged::putIfAbsent);
+            Object inputObj = attrs.get("input");
+            if (inputObj instanceof Map) {
+                ((Map<String, Object>) inputObj).forEach(merged::putIfAbsent);
+            }
+        }
+
+        activation.put("attributes", merged);
+        // Promote to top level for bare-identifier access; framework keys win 
on collision.
+        merged.forEach(activation::putIfAbsent);
+        // Event UUID only as fallback — a user-supplied id attribute takes 
precedence.
+        activation.putIfAbsent("id", event.getId().toString());
+
+        return activation;
+    }
+
+    /**
+     * Maximum recursion depth for {@link #normalizeValue}. Past this depth, 
strings are kept as
+     * plain strings rather than parsed as JSON (graceful degrade, mirror of 
Python {@code
+     * _MAX_NORMALIZE_DEPTH}). Prevents stack blow-up on adversarial nested 
JSON input.
+     */
+    static final int MAX_NORMALIZE_DEPTH = 16;
+
+    /** JSON-looking strings → Map/List; narrow numerics widened to 
long/double for CEL. */
+    @SuppressWarnings("unchecked")
+    private static Object normalizeValue(Object value, int depth) {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof String) {
+            // Depth cap reached — keep the raw string (mirrors Python's 
_MAX_NORMALIZE_DEPTH).

Review Comment:
   I don't see any python implementations related to _`MAX_NORMALIZE_DEPTH`. 
What means `mirrors Python's _MAX_NORMALIZE_DEPTH`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to