This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit b23c57f9aab8a1a72ad4a7c6ca2f03803701f3b9 Author: WenjinXie <[email protected]> AuthorDate: Fri Sep 26 15:18:42 2025 +0800 [runtime][java] Auto create StreamTableEnvironment when need. --- .../agents/api/AgentsExecutionEnvironment.java | 35 +++++--- .../test/DataStreamTableIntegrationExample.java | 93 ++++++++++++++++++++++ .../agents/integration/test/ReActAgentExample.java | 3 +- .../integration/test/TableIntegrationExample.java | 4 +- .../runtime/env/RemoteExecutionEnvironment.java | 41 +++++++--- 5 files changed, 150 insertions(+), 26 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java index 8efe6f4..df98e32 100644 --- a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java +++ b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import javax.annotation.Nullable; + import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.HashMap; @@ -63,10 +65,11 @@ public abstract class AgentsExecutionEnvironment { * * @param env Optional StreamExecutionEnvironment for remote execution. If null, a local * execution environment will be created. + * @param tEnv Optional StreamTableEnvironment for table-to-stream conversion. * @return AgentsExecutionEnvironment appropriate for the execution context. */ public static AgentsExecutionEnvironment getExecutionEnvironment( - StreamExecutionEnvironment env) { + StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) { if (env == null) { // Return local execution environment for testing/development try { @@ -86,14 +89,31 @@ public abstract class AgentsExecutionEnvironment { "org.apache.flink.agents.runtime.env.RemoteExecutionEnvironment"); return (AgentsExecutionEnvironment) remoteEnvClass - .getDeclaredConstructor(StreamExecutionEnvironment.class) - .newInstance(env); + .getDeclaredConstructor( + StreamExecutionEnvironment.class, + StreamTableEnvironment.class) + .newInstance(env, tEnv); } catch (Exception e) { throw new RuntimeException("Failed to create RemoteExecutionEnvironment", e); } } } + /** + * Convenience method to get execution environment without Flink StreamTableEnvironment. If + * StreamTableEnvironment is needed during execution, the environment will auto crate using + * StreamExecutionEnvironment. + * + * <p>* @param env Optional StreamExecutionEnvironment for remote execution. If null, a local + * execution environment will be created. + * + * @return Remote execution environment for testing and development. + */ + public static AgentsExecutionEnvironment getExecutionEnvironment( + StreamExecutionEnvironment env) { + return getExecutionEnvironment(env, null); + } + /** * Convenience method to get execution environment without Flink integration. * @@ -155,23 +175,20 @@ public abstract class AgentsExecutionEnvironment { * and processing it through agents. * * @param input Table to be processed by agents. - * @param tableEnv StreamTableEnvironment for table-to-stream conversion. * @param keySelector Optional KeySelector for extracting keys from table rows. * @param <K> Type of the key extracted by the KeySelector. * @return AgentBuilder for configuring the agent pipeline. */ - public abstract <K> AgentBuilder fromTable( - Table input, StreamTableEnvironment tableEnv, KeySelector<Object, K> keySelector); + public abstract <K> AgentBuilder fromTable(Table input, KeySelector<Object, K> keySelector); /** * Set input for agents from a Table without keying. * * @param input Table to be processed by agents. - * @param tableEnv StreamTableEnvironment for table-to-stream conversion. * @return AgentBuilder for configuring the agent pipeline. */ - public AgentBuilder fromTable(Table input, StreamTableEnvironment tableEnv) { - return fromTable(input, tableEnv, null); + public AgentBuilder fromTable(Table input) { + return fromTable(input, null); } /** diff --git a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java new file mode 100644 index 0000000..625eb47 --- /dev/null +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/DataStreamTableIntegrationExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.integration.test; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; + +public class DataStreamTableIntegrationExample { + /** Simple data class for the example. */ + public static class ItemData { + public final int id; + public final String name; + public final double value; + public int visit_count; + + public ItemData(int id, String name, double value) { + this.id = id; + this.name = name; + this.value = value; + this.visit_count = 0; + } + + @Override + public String toString() { + return String.format( + "ItemData{id=%d, name='%s', value=%.2f,visit_count=%d}", + id, name, value, visit_count); + } + } + + /** Key selector for extracting keys from ItemData. */ + public static class ItemKeySelector + implements KeySelector<DataStreamIntegrationExample.ItemData, Integer> { + @Override + public Integer getKey(DataStreamIntegrationExample.ItemData item) { + return item.id; + } + } + + public static void main(String[] args) throws Exception { + // Create the execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + // Create input DataStream + DataStream<DataStreamIntegrationExample.ItemData> inputStream = + env.fromElements( + new DataStreamIntegrationExample.ItemData(1, "item1", 10.5), + new DataStreamIntegrationExample.ItemData(2, "item2", 20.0), + new DataStreamIntegrationExample.ItemData(3, "item3", 15.7), + new DataStreamIntegrationExample.ItemData(1, "item1_updated", 12.3), + new DataStreamIntegrationExample.ItemData(2, "item2_updated", 22.1), + new DataStreamIntegrationExample.ItemData(1, "item1_updated_again", 15.3)); + + // Create agents execution environment + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + // Define output schema + Schema outputSchema = Schema.newBuilder().column("f0", DataTypes.STRING()).build(); + + // Apply agent to the Table + Table outputTable = + agentsEnv + .fromDataStream( + inputStream, new DataStreamIntegrationExample.ItemKeySelector()) + .apply(new DataStreamAgent()) + .toTable(outputSchema); + + outputTable.execute().print(); + } +} diff --git a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java index fbce8fd..af99756 100644 --- a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/ReActAgentExample.java @@ -66,7 +66,7 @@ public class ReActAgentExample { // Create agents execution environment AgentsExecutionEnvironment agentsEnv = - AgentsExecutionEnvironment.getExecutionEnvironment(env); + AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv); // register resource to agents execution environment. agentsEnv @@ -102,7 +102,6 @@ public class ReActAgentExample { agentsEnv .fromTable( inputTable, - tableEnv, (KeySelector<Object, Double>) value -> (Double) ((Row) value).getField("a")) .apply(agent) diff --git a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java index 30c4d17..89835d7 100644 --- a/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java +++ b/e2e-test/integration-test/src/main/java/org/apache/flink/agents/integration/test/TableIntegrationExample.java @@ -78,7 +78,7 @@ public class TableIntegrationExample { // Create agents execution environment AgentsExecutionEnvironment agentsEnv = - AgentsExecutionEnvironment.getExecutionEnvironment(env); + AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv); // Define output schema Schema outputSchema = Schema.newBuilder().column("f0", DataTypes.STRING()).build(); @@ -86,7 +86,7 @@ public class TableIntegrationExample { // Apply agent to the Table Table outputTable = agentsEnv - .fromTable(inputTable, tableEnv, new RowKeySelector()) + .fromTable(inputTable, new RowKeySelector()) .apply(new TableAgent()) .toTable(outputSchema); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java index 096d7aa..8b2eb29 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/env/RemoteExecutionEnvironment.java @@ -34,6 +34,8 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import javax.annotation.Nullable; + import java.io.File; import java.util.HashMap; import java.util.List; @@ -48,17 +50,27 @@ import java.util.Map; public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { private final StreamExecutionEnvironment env; + private @Nullable StreamTableEnvironment tEnv; private final AgentConfiguration config; public static final String FLINK_CONF_FILENAME = "config.yaml"; - public RemoteExecutionEnvironment(StreamExecutionEnvironment env) { + public RemoteExecutionEnvironment( + StreamExecutionEnvironment env, @Nullable StreamTableEnvironment tEnv) { this.env = env; + this.tEnv = tEnv; final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); this.config = loadAgentConfiguration(configDir); } + private StreamTableEnvironment getTableEnvironment() { + if (tEnv == null) { + tEnv = StreamTableEnvironment.create(env); + } + return tEnv; + } + @Override public AgentConfiguration getConfig() { return config; @@ -72,13 +84,13 @@ public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { @Override public <T, K> AgentBuilder fromDataStream(DataStream<T> input, KeySelector<T, K> keySelector) { - return new RemoteAgentBuilder<>(input, keySelector, env, config, resources); + return new RemoteAgentBuilder<>(input, tEnv, keySelector, env, config, resources); } @Override - public <K> AgentBuilder fromTable( - Table input, StreamTableEnvironment tableEnv, KeySelector<Object, K> keySelector) { - return new RemoteAgentBuilder<>(input, tableEnv, keySelector, env, config, resources); + public <K> AgentBuilder fromTable(Table input, KeySelector<Object, K> keySelector) { + return new RemoteAgentBuilder<>( + input, getTableEnvironment(), keySelector, env, config, resources); } @Override @@ -110,7 +122,7 @@ public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { private final DataStream<T> inputDataStream; private final KeySelector<T, K> keySelector; private final StreamExecutionEnvironment env; - private final StreamTableEnvironment tableEnv; + private @Nullable StreamTableEnvironment tableEnv; private final AgentConfiguration config; private final Map<ResourceType, Map<String, Object>> resources; @@ -120,6 +132,7 @@ public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { // Constructor for DataStream input public RemoteAgentBuilder( DataStream<T> inputDataStream, + @Nullable StreamTableEnvironment tableEnv, KeySelector<T, K> keySelector, StreamExecutionEnvironment env, AgentConfiguration config, @@ -127,7 +140,7 @@ public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { this.inputDataStream = inputDataStream; this.keySelector = keySelector; this.env = env; - this.tableEnv = null; + this.tableEnv = tableEnv; this.config = config; this.resources = resources; } @@ -149,6 +162,13 @@ public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { this.resources = resources; } + private StreamTableEnvironment getTableEnvironment() { + if (tableEnv == null) { + tableEnv = StreamTableEnvironment.create(env); + } + return tableEnv; + } + @Override public AgentBuilder apply(Agent agent) { try { @@ -189,13 +209,8 @@ public class RemoteExecutionEnvironment extends AgentsExecutionEnvironment { @Override public Table toTable(Schema schema) { - if (tableEnv == null) { - throw new IllegalStateException( - "Table environment not available. Use fromTable() method to enable table output."); - } - DataStream<Object> dataStream = toDataStream(); - return tableEnv.fromDataStream(dataStream, schema); + return getTableEnvironment().fromDataStream(dataStream, schema); } } }
