This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit b23c57f9aab8a1a72ad4a7c6ca2f03803701f3b9
Author: WenjinXie <[email protected]>
AuthorDate: Fri Sep 26 15:18:42 2025 +0800

    [runtime][java] Auto create StreamTableEnvironment when need.
---
 .../agents/api/AgentsExecutionEnvironment.java     | 35 +++++---
 .../test/DataStreamTableIntegrationExample.java    | 93 ++++++++++++++++++++++
 .../agents/integration/test/ReActAgentExample.java |  3 +-
 .../integration/test/TableIntegrationExample.java  |  4 +-
 .../runtime/env/RemoteExecutionEnvironment.java    | 41 +++++++---
 5 files changed, 150 insertions(+), 26 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java 
b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
index 8efe6f4..df98e32 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.HashMap;
@@ -63,10 +65,11 @@ public abstract class AgentsExecutionEnvironment {
      *
      * @param env Optional StreamExecutionEnvironment for remote execution. If 
null, a local
      *     execution environment will be created.
+     * @param tEnv Optional StreamTableEnvironment for table-to-stream 
conversion.
      * @return AgentsExecutionEnvironment appropriate for the execution 
context.
      */
     public static AgentsExecutionEnvironment getExecutionEnvironment(
-            StreamExecutionEnvironment env) {
+            StreamExecutionEnvironment env, @Nullable StreamTableEnvironment 
tEnv) {
         if (env == null) {
             // Return local execution environment for testing/development
             try {
@@ -86,14 +89,31 @@ public abstract class AgentsExecutionEnvironment {
                                 
"org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment");
                 return (AgentsExecutionEnvironment)
                         remoteEnvClass
-                                
.getDeclaredConstructor(StreamExecutionEnvironment.class)
-                                .newInstance(env);
+                                .getDeclaredConstructor(
+                                        StreamExecutionEnvironment.class,
+                                        StreamTableEnvironment.class)
+                                .newInstance(env, tEnv);
             } catch (Exception e) {
                 throw new RuntimeException("Failed to create 
RemoteExecutionEnvironment", e);
             }
         }
     }
 
+    /**
+     * Convenience method to get execution environment without Flink 
StreamTableEnvironment. If
+     * StreamTableEnvironment is needed during execution, the environment will 
auto crate using
+     * StreamExecutionEnvironment.
+     *
+     * <p>* @param env Optional StreamExecutionEnvironment for remote 
execution. If null, a local
+     * execution environment will be created.
+     *
+     * @return Remote execution environment for testing and development.
+     */
+    public static AgentsExecutionEnvironment getExecutionEnvironment(
+            StreamExecutionEnvironment env) {
+        return getExecutionEnvironment(env, null);
+    }
+
     /**
      * Convenience method to get execution environment without Flink 
integration.
      *
@@ -155,23 +175,20 @@ public abstract class AgentsExecutionEnvironment {
      * and processing it through agents.
      *
      * @param input Table to be processed by agents.
-     * @param tableEnv StreamTableEnvironment for table-to-stream conversion.
      * @param keySelector Optional KeySelector for extracting keys from table 
rows.
      * @param <K> Type of the key extracted by the KeySelector.
      * @return AgentBuilder for configuring the agent pipeline.
      */
-    public abstract <K> AgentBuilder fromTable(
-            Table input, StreamTableEnvironment tableEnv, KeySelector<Object, 
K> keySelector);
+    public abstract <K> AgentBuilder fromTable(Table input, 
KeySelector<Object, K> keySelector);
 
     /**
      * Set input for agents from a Table without keying.
      *
      * @param input Table to be processed by agents.
-     * @param tableEnv StreamTableEnvironment for table-to-stream conversion.
      * @return AgentBuilder for configuring the agent pipeline.
      */
-    public AgentBuilder fromTable(Table input, StreamTableEnvironment 
tableEnv) {
-        return fromTable(input, tableEnv, null);
+    public AgentBuilder fromTable(Table input) {
+        return fromTable(input, null);
     }
 
     /**
diff --git 
a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java
 
b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java
new file mode 100644
index 0000000..625eb47
--- /dev/null
+++ 
b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integration.test;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+
+public class DataStreamTableIntegrationExample {
+    /** Simple data class for the example. */
+    public static class ItemData {
+        public final int id;
+        public final String name;
+        public final double value;
+        public int visit_count;
+
+        public ItemData(int id, String name, double value) {
+            this.id = id;
+            this.name = name;
+            this.value = value;
+            this.visit_count = 0;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "ItemData{id=%d, name='%s', value=%.2f,visit_count=%d}",
+                    id, name, value, visit_count);
+        }
+    }
+
+    /** Key selector for extracting keys from ItemData. */
+    public static class ItemKeySelector
+            implements KeySelector<DataStreamIntegrationExample.ItemData, 
Integer> {
+        @Override
+        public Integer getKey(DataStreamIntegrationExample.ItemData item) {
+            return item.id;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        // Create the execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        // Create input DataStream
+        DataStream<DataStreamIntegrationExample.ItemData> inputStream =
+                env.fromElements(
+                        new DataStreamIntegrationExample.ItemData(1, "item1", 
10.5),
+                        new DataStreamIntegrationExample.ItemData(2, "item2", 
20.0),
+                        new DataStreamIntegrationExample.ItemData(3, "item3", 
15.7),
+                        new DataStreamIntegrationExample.ItemData(1, 
"item1_updated", 12.3),
+                        new DataStreamIntegrationExample.ItemData(2, 
"item2_updated", 22.1),
+                        new DataStreamIntegrationExample.ItemData(1, 
"item1_updated_again", 15.3));
+
+        // Create agents execution environment
+        AgentsExecutionEnvironment agentsEnv =
+                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+        // Define output schema
+        Schema outputSchema = Schema.newBuilder().column("f0", 
DataTypes.STRING()).build();
+
+        // Apply agent to the Table
+        Table outputTable =
+                agentsEnv
+                        .fromDataStream(
+                                inputStream, new 
DataStreamIntegrationExample.ItemKeySelector())
+                        .apply(new DataStreamAgent())
+                        .toTable(outputSchema);
+
+        outputTable.execute().print();
+    }
+}
diff --git 
a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java
 
b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java
index fbce8fd..af99756 100644
--- 
a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java
+++ 
b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java
@@ -66,7 +66,7 @@ public class ReActAgentExample {
 
         // Create agents execution environment
         AgentsExecutionEnvironment agentsEnv =
-                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+                AgentsExecutionEnvironment.getExecutionEnvironment(env, 
tableEnv);
 
         // register resource to agents execution environment.
         agentsEnv
@@ -102,7 +102,6 @@ public class ReActAgentExample {
                 agentsEnv
                         .fromTable(
                                 inputTable,
-                                tableEnv,
                                 (KeySelector<Object, Double>)
                                         value -> (Double) ((Row) 
value).getField("a"))
                         .apply(agent)
diff --git 
a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java
 
b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java
index 30c4d17..89835d7 100644
--- 
a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java
+++ 
b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java
@@ -78,7 +78,7 @@ public class TableIntegrationExample {
 
         // Create agents execution environment
         AgentsExecutionEnvironment agentsEnv =
-                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+                AgentsExecutionEnvironment.getExecutionEnvironment(env, 
tableEnv);
 
         // Define output schema
         Schema outputSchema = Schema.newBuilder().column("f0", 
DataTypes.STRING()).build();
@@ -86,7 +86,7 @@ public class TableIntegrationExample {
         // Apply agent to the Table
         Table outputTable =
                 agentsEnv
-                        .fromTable(inputTable, tableEnv, new RowKeySelector())
+                        .fromTable(inputTable, new RowKeySelector())
                         .apply(new TableAgent())
                         .toTable(outputSchema);
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
index 096d7aa..8b2eb29 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.util.HashMap;
 import java.util.List;
@@ -48,17 +50,27 @@ import java.util.Map;
 public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment {
 
     private final StreamExecutionEnvironment env;
+    private @Nullable StreamTableEnvironment tEnv;
 
     private final AgentConfiguration config;
 
     public static final String FLINK_CONF_FILENAME = "config.yaml";
 
-    public RemoteExecutionEnvironment(StreamExecutionEnvironment env) {
+    public RemoteExecutionEnvironment(
+            StreamExecutionEnvironment env, @Nullable StreamTableEnvironment 
tEnv) {
         this.env = env;
+        this.tEnv = tEnv;
         final String configDir = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
         this.config = loadAgentConfiguration(configDir);
     }
 
+    private StreamTableEnvironment getTableEnvironment() {
+        if (tEnv == null) {
+            tEnv = StreamTableEnvironment.create(env);
+        }
+        return tEnv;
+    }
+
     @Override
     public AgentConfiguration getConfig() {
         return config;
@@ -72,13 +84,13 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
 
     @Override
     public <T, K> AgentBuilder fromDataStream(DataStream<T> input, 
KeySelector<T, K> keySelector) {
-        return new RemoteAgentBuilder<>(input, keySelector, env, config, 
resources);
+        return new RemoteAgentBuilder<>(input, tEnv, keySelector, env, config, 
resources);
     }
 
     @Override
-    public <K> AgentBuilder fromTable(
-            Table input, StreamTableEnvironment tableEnv, KeySelector<Object, 
K> keySelector) {
-        return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env, 
config, resources);
+    public <K> AgentBuilder fromTable(Table input, KeySelector<Object, K> 
keySelector) {
+        return new RemoteAgentBuilder<>(
+                input, getTableEnvironment(), keySelector, env, config, 
resources);
     }
 
     @Override
@@ -110,7 +122,7 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
         private final DataStream<T> inputDataStream;
         private final KeySelector<T, K> keySelector;
         private final StreamExecutionEnvironment env;
-        private final StreamTableEnvironment tableEnv;
+        private @Nullable StreamTableEnvironment tableEnv;
         private final AgentConfiguration config;
         private final Map<ResourceType, Map<String, Object>> resources;
 
@@ -120,6 +132,7 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
         // Constructor for DataStream input
         public RemoteAgentBuilder(
                 DataStream<T> inputDataStream,
+                @Nullable StreamTableEnvironment tableEnv,
                 KeySelector<T, K> keySelector,
                 StreamExecutionEnvironment env,
                 AgentConfiguration config,
@@ -127,7 +140,7 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
             this.inputDataStream = inputDataStream;
             this.keySelector = keySelector;
             this.env = env;
-            this.tableEnv = null;
+            this.tableEnv = tableEnv;
             this.config = config;
             this.resources = resources;
         }
@@ -149,6 +162,13 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
             this.resources = resources;
         }
 
+        private StreamTableEnvironment getTableEnvironment() {
+            if (tableEnv == null) {
+                tableEnv = StreamTableEnvironment.create(env);
+            }
+            return tableEnv;
+        }
+
         @Override
         public AgentBuilder apply(Agent agent) {
             try {
@@ -189,13 +209,8 @@ public class RemoteExecutionEnvironment extends 
AgentsExecutionEnvironment {
 
         @Override
         public Table toTable(Schema schema) {
-            if (tableEnv == null) {
-                throw new IllegalStateException(
-                        "Table environment not available. Use fromTable() 
method to enable table output.");
-            }
-
             DataStream<Object> dataStream = toDataStream();
-            return tableEnv.fromDataStream(dataStream, schema);
+            return getTableEnvironment().fromDataStream(dataStream, schema);
         }
     }
 }

Reply via email to