zch93 commented on code in PR #26933:
URL: https://github.com/apache/flink/pull/26933#discussion_r2293777671
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java:
##########
@@ -41,6 +46,39 @@
+ "operator-coordinator-state-size-in-bytes
BIGINT NOT NULL, "
+ "operator-total-size-in-bytes BIGINT NOT
NULL>"))
public class SavepointMetadataTableFunction extends TableFunction<Row> {
+
+ public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
Review Comment:
nice to keep the definition near the code. do you think we should also mark
that the function should not be pre-calculated by the optimizer? i mean to make
it clear that flink must always run it at query time, not try to be clever and
“pre-compute” it
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java:
##########
@@ -20,70 +20,46 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.state.table.SavepointMetadataTableFunction;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import
org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.module.Module;
-import org.apache.flink.table.types.inference.TypeStrategies;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.flink.table.functions.FunctionKind.TABLE;
-
/** Module of state in Flink. */
@Experimental
public class StateModule implements Module {
public static final String IDENTIFIER = "state";
- public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
- BuiltInFunctionDefinition.newBuilder()
- .name("savepoint_metadata")
- .kind(TABLE)
-
.runtimeClass(SavepointMetadataTableFunction.class.getName())
- .outputTypeStrategy(
- TypeStrategies.explicit(
- DataTypes.ROW(
- DataTypes.FIELD(
- "checkpoint-id",
DataTypes.BIGINT().notNull()),
- DataTypes.FIELD("operator-name",
DataTypes.STRING()),
- DataTypes.FIELD("operator-uid",
DataTypes.STRING()),
- DataTypes.FIELD(
- "operator-uid-hash",
-
DataTypes.STRING().notNull()),
- DataTypes.FIELD(
- "operator-parallelism",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
- "operator-max-parallelism",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
-
"operator-subtask-state-count",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
-
"operator-coordinator-state-size-in-bytes",
-
DataTypes.BIGINT().notNull()),
- DataTypes.FIELD(
-
"operator-total-size-in-bytes",
-
DataTypes.BIGINT().notNull()))))
- .build();
-
public static final StateModule INSTANCE = new StateModule();
private final Map<String, BuiltInFunctionDefinition> normalizedFunctions;
private final Set<String> functionNamesWithInternal;
private final Set<String> functionNamesWithoutInternal;
private StateModule() {
- final List<BuiltInFunctionDefinition> definitions =
- Collections.singletonList(SAVEPOINT_METADATA);
+ final List<BuiltInFunctionDefinition> definitions = new ArrayList<>();
+
+ definitions.add(SavepointMetadataTableFunction.SAVEPOINT_METADATA);
+ ServiceLoader.load(DynamicBuiltInFunctionDefinitionFactory.class)
+ .iterator()
+ .forEachRemaining(
+ f -> {
+ if (f.factoryIdentifier().startsWith(IDENTIFIER)) {
Review Comment:
👍 on the spi
nit question: can the `startsWith(IDENTIFIER)` match accidentally with
something like `stateful-x`? maybe we can say to only accept plugins that say
“i belong to the state module” in a very clear way (`like state.kafka_offsets`)
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java:
##########
@@ -20,70 +20,46 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.state.table.SavepointMetadataTableFunction;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import
org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.module.Module;
-import org.apache.flink.table.types.inference.TypeStrategies;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.flink.table.functions.FunctionKind.TABLE;
-
/** Module of state in Flink. */
@Experimental
public class StateModule implements Module {
public static final String IDENTIFIER = "state";
- public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
- BuiltInFunctionDefinition.newBuilder()
- .name("savepoint_metadata")
- .kind(TABLE)
-
.runtimeClass(SavepointMetadataTableFunction.class.getName())
- .outputTypeStrategy(
- TypeStrategies.explicit(
- DataTypes.ROW(
- DataTypes.FIELD(
- "checkpoint-id",
DataTypes.BIGINT().notNull()),
- DataTypes.FIELD("operator-name",
DataTypes.STRING()),
- DataTypes.FIELD("operator-uid",
DataTypes.STRING()),
- DataTypes.FIELD(
- "operator-uid-hash",
-
DataTypes.STRING().notNull()),
- DataTypes.FIELD(
- "operator-parallelism",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
- "operator-max-parallelism",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
-
"operator-subtask-state-count",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
-
"operator-coordinator-state-size-in-bytes",
-
DataTypes.BIGINT().notNull()),
- DataTypes.FIELD(
-
"operator-total-size-in-bytes",
-
DataTypes.BIGINT().notNull()))))
- .build();
-
public static final StateModule INSTANCE = new StateModule();
private final Map<String, BuiltInFunctionDefinition> normalizedFunctions;
private final Set<String> functionNamesWithInternal;
private final Set<String> functionNamesWithoutInternal;
private StateModule() {
- final List<BuiltInFunctionDefinition> definitions =
- Collections.singletonList(SAVEPOINT_METADATA);
+ final List<BuiltInFunctionDefinition> definitions = new ArrayList<>();
+
+ definitions.add(SavepointMetadataTableFunction.SAVEPOINT_METADATA);
Review Comment:
maybe i misunderstood something, but what if two plugins register the same
function name? if two providers define get_offsets, which one wins? so maybe
after building, we can consider `Collections.unmodifiableList(definitions)` (or
deduplicate names before normalization) to avoid accidental mutation. wdyt?
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java:
##########
@@ -20,70 +20,46 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.state.table.SavepointMetadataTableFunction;
-import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import
org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.module.Module;
-import org.apache.flink.table.types.inference.TypeStrategies;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.flink.table.functions.FunctionKind.TABLE;
-
/** Module of state in Flink. */
@Experimental
public class StateModule implements Module {
public static final String IDENTIFIER = "state";
- public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
- BuiltInFunctionDefinition.newBuilder()
- .name("savepoint_metadata")
- .kind(TABLE)
-
.runtimeClass(SavepointMetadataTableFunction.class.getName())
- .outputTypeStrategy(
- TypeStrategies.explicit(
- DataTypes.ROW(
- DataTypes.FIELD(
- "checkpoint-id",
DataTypes.BIGINT().notNull()),
- DataTypes.FIELD("operator-name",
DataTypes.STRING()),
- DataTypes.FIELD("operator-uid",
DataTypes.STRING()),
- DataTypes.FIELD(
- "operator-uid-hash",
-
DataTypes.STRING().notNull()),
- DataTypes.FIELD(
- "operator-parallelism",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
- "operator-max-parallelism",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
-
"operator-subtask-state-count",
- DataTypes.INT().notNull()),
- DataTypes.FIELD(
-
"operator-coordinator-state-size-in-bytes",
-
DataTypes.BIGINT().notNull()),
- DataTypes.FIELD(
-
"operator-total-size-in-bytes",
-
DataTypes.BIGINT().notNull()))))
- .build();
-
public static final StateModule INSTANCE = new StateModule();
Review Comment:
nit: this is not part of this pr, but it seems like the constructor does
classpath scanning, and that is fine if the module is loaded once, but worth a
note: if someone repeatedly constructs modules in tests, scanning happens each
time
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/StateModuleTest.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the savepoint SQL reader. */
+public class StateModuleTest {
+ @Test
+ public void testDynamicBuiltinFunction() throws Exception {
+ Configuration config = new Configuration();
+ config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
Review Comment:
do you think we should add a test with streaming mode too and add a negative
test for name collisions?
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/module/ExampleDynamicBuiltInFunctionDefinitionFactory.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table.module;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import
org.apache.flink.table.functions.DynamicBuiltInFunctionDefinitionFactory;
+
+import java.util.List;
+
+public class ExampleDynamicBuiltInFunctionDefinitionFactory
Review Comment:
api looks minimal and clear.
nit: maybe we can standardize to <module>.<provider> and document that
function names returned must be unique within the module (and what happens
otherwise)
nit: maybe we can add an optional `ModuleContext` param (classloader,
config). not needed now, but helps if a provider later needs context without
breaking the spi
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]