This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ae09a0dbf09 [FLINK-37407][state] Add savepoint metadata SQL built-in 
process function
ae09a0dbf09 is described below

commit ae09a0dbf09d0cbfaab3c3843e9f615fab5a2095
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon Apr 7 17:27:40 2025 +0200

    [FLINK-37407][state] Add savepoint metadata SQL built-in process function
---
 docs/content/docs/libs/state_processor_api.md      |  22 ++++
 flink-libraries/flink-state-processing-api/pom.xml |   7 ++
 .../state/table/SavepointConnectorOptions.java     |   2 +-
 .../table/SavepointDataStreamScanProvider.java     |   2 +-
 .../state/table/SavepointDynamicTableSource.java   |   2 +-
 .../table/SavepointMetadataTableFunction.java      |  72 ++++++++++++
 .../flink/state/table/module/StateModule.java      | 123 +++++++++++++++++++++
 .../state/table/module/StateModuleFactory.java     |  52 +++++++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../table/SavepointDynamicTableSourceTest.java     |   2 +-
 .../SavepointMetadataDynamicTableSourceTest.java   |  61 ++++++++++
 11 files changed, 342 insertions(+), 4 deletions(-)

diff --git a/docs/content/docs/libs/state_processor_api.md 
b/docs/content/docs/libs/state_processor_api.md
index d30bf2a5e9e..3a211182bc5 100644
--- a/docs/content/docs/libs/state_processor_api.md
+++ b/docs/content/docs/libs/state_processor_api.md
@@ -515,6 +515,28 @@ Before you interrogate state using the table API, make 
sure to review our [Flink
 
 IMPORTANT NOTE: State Table API only supports keyed state.
 
+### Metadata
+
+The following SQL table function allows users to read the metadata of 
savepoints and checkpoints in the following way:
+```SQL
+LOAD MODULE state;
+SELECT * FROM savepoint_metadata('/root/dir/of/checkpoint-data/chk-1');
+```
+
+The new table function creates a table with the following fixed schema:
+
+| Key                                      | Data type       | Description     
                                                                                
                                                                                
                |
+|------------------------------------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| checkpoint-id                            | BIGINT NOT NULL | Checkpoint ID.  
                                                                                
                                                                                
                |
+| operator-name                            | STRING          | Operator Name.  
                                                                                
                                                                                
                |
+| operator-uid                             | STRING          | Operator UID.   
                                                                                
                                                                                
                |
+| operator-uid-hash                        | STRING NOT NULL | Operator UID 
hash.                                                                           
                                                                                
                   |
+| operator-parallelism                     | INT NOT NULL    | Parallelism of 
the operator.                                                                   
                                                                                
                 |
+| operator-max-parallelism                 | INT NOT NULL    | Maximum 
parallelism of the operator.                                                    
                                                                                
                        |
+| operator-subtask-state-count             | INT NOT NULL    | Number of 
operator subtask states. It represents the state partition count divided by the 
operator's parallelism and might be 0 if the state is not partitioned (for 
example broadcast source). |
+| operator-coordinator-state-size-in-bytes | BIGINT NOT NULL | The operator’s 
coordinator state size in bytes, or zero if no coordinator state.               
                                                                                
                 |
+| operator-total-size-in-bytes             | BIGINT NOT NULL | Total operator 
state size in bytes.                                                            
                                                                                
                 |
+
 ### Keyed State
 
 [Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-state), also known as partitioned state, is any state that is 
partitioned relative to a key.
diff --git a/flink-libraries/flink-state-processing-api/pom.xml 
b/flink-libraries/flink-state-processing-api/pom.xml
index 691c30a9ef4..9eaf4b416a2 100644
--- a/flink-libraries/flink-state-processing-api/pom.xml
+++ b/flink-libraries/flink-state-processing-api/pom.xml
@@ -54,6 +54,13 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-runtime</artifactId>
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java
index 1c596ad4b49..56a3f4064cb 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.description.TextElement;
 
 import static org.apache.flink.configuration.description.TextElement.code;
 
-/** Options for the state connector. */
+/** Options for the savepoint connector. */
 @PublicEvolving
 public class SavepointConnectorOptions {
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
index b2b12f2d201..61cd7c0342f 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
@@ -43,7 +43,7 @@ import javax.naming.ConfigurationException;
 
 import java.util.List;
 
-/** State data stream scan provider. */
+/** Savepoint data stream scan provider. */
 public class SavepointDataStreamScanProvider implements DataStreamScanProvider 
{
     @Nullable private final String stateBackendType;
     private final String statePath;
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java
index 14847a95860..0af28fd83e3 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java
@@ -66,7 +66,7 @@ public class SavepointDynamicTableSource implements 
ScanTableSource {
 
     @Override
     public String asSummaryString() {
-        return "State Table Source";
+        return "Savepoint Table Source";
     }
 
     @Override
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java
new file mode 100644
index 00000000000..adf170320bd
--- /dev/null
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.state.api.runtime.SavepointLoader;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+@Internal
+@FunctionHint(
+        output =
+                @DataTypeHint(
+                        "ROW<checkpoint-id BIGINT NOT NULL, "
+                                + "operator-name STRING, "
+                                + "operator-uid STRING, operator-uid-hash 
STRING NOT NULL, "
+                                + "operator-parallelism INT NOT NULL, "
+                                + "operator-max-parallelism INT NOT NULL, "
+                                + "operator-subtask-state-count INT NOT NULL, "
+                                + "operator-coordinator-state-size-in-bytes 
BIGINT NOT NULL, "
+                                + "operator-total-size-in-bytes BIGINT NOT 
NULL>"))
+public class SavepointMetadataTableFunction extends TableFunction<Row> {
+    public 
SavepointMetadataTableFunction(SpecializedFunction.SpecializedContext context) 
{}
+
+    public void eval(String savepointPath) {
+        try {
+            CheckpointMetadata checkpointMetadata =
+                    SavepointLoader.loadSavepointMetadata(savepointPath);
+
+            for (OperatorState operatorState : 
checkpointMetadata.getOperatorStates()) {
+                Row row = Row.withNames();
+                row.setField("checkpoint-id", 
checkpointMetadata.getCheckpointId());
+                row.setField("operator-name", 
operatorState.getOperatorName().orElse(null));
+                row.setField("operator-uid", 
operatorState.getOperatorUid().orElse(null));
+                row.setField("operator-uid-hash", 
operatorState.getOperatorID().toHexString());
+                row.setField("operator-parallelism", 
operatorState.getParallelism());
+                row.setField("operator-max-parallelism", 
operatorState.getMaxParallelism());
+                row.setField("operator-subtask-state-count", 
operatorState.getStates().size());
+                row.setField(
+                        "operator-coordinator-state-size-in-bytes",
+                        operatorState.getCoordinatorState() != null
+                                ? 
operatorState.getCoordinatorState().getStateSize()
+                                : 0L);
+                row.setField("operator-total-size-in-bytes", 
operatorState.getCheckpointedSize());
+                collect(row);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java
new file mode 100644
index 00000000000..c69a407dcf2
--- /dev/null
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModule.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table.module;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.state.table.SavepointMetadataTableFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.types.inference.TypeStrategies;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.functions.FunctionKind.TABLE;
+
+/** Module of state in Flink. */
+@Experimental
+public class StateModule implements Module {
+
+    public static final String IDENTIFIER = "state";
+
+    public static final BuiltInFunctionDefinition SAVEPOINT_METADATA =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("savepoint_metadata")
+                    .kind(TABLE)
+                    
.runtimeClass(SavepointMetadataTableFunction.class.getName())
+                    .outputTypeStrategy(
+                            TypeStrategies.explicit(
+                                    DataTypes.ROW(
+                                            DataTypes.FIELD(
+                                                    "checkpoint-id", 
DataTypes.BIGINT().notNull()),
+                                            DataTypes.FIELD("operator-name", 
DataTypes.STRING()),
+                                            DataTypes.FIELD("operator-uid", 
DataTypes.STRING()),
+                                            DataTypes.FIELD(
+                                                    "operator-uid-hash",
+                                                    
DataTypes.STRING().notNull()),
+                                            DataTypes.FIELD(
+                                                    "operator-parallelism",
+                                                    DataTypes.INT().notNull()),
+                                            DataTypes.FIELD(
+                                                    "operator-max-parallelism",
+                                                    DataTypes.INT().notNull()),
+                                            DataTypes.FIELD(
+                                                    
"operator-subtask-state-count",
+                                                    DataTypes.INT().notNull()),
+                                            DataTypes.FIELD(
+                                                    
"operator-coordinator-state-size-in-bytes",
+                                                    
DataTypes.BIGINT().notNull()),
+                                            DataTypes.FIELD(
+                                                    
"operator-total-size-in-bytes",
+                                                    
DataTypes.BIGINT().notNull()))))
+                    .build();
+
+    public static final StateModule INSTANCE = new StateModule();
+
+    private final Map<String, BuiltInFunctionDefinition> normalizedFunctions;
+    private final Set<String> functionNamesWithInternal;
+    private final Set<String> functionNamesWithoutInternal;
+
+    private StateModule() {
+        final List<BuiltInFunctionDefinition> definitions =
+                Collections.singletonList(SAVEPOINT_METADATA);
+        this.normalizedFunctions =
+                definitions.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        f -> 
f.getName().toUpperCase(Locale.ROOT),
+                                        Function.identity()));
+        this.functionNamesWithInternal =
+                definitions.stream()
+                        .map(BuiltInFunctionDefinition::getName)
+                        .collect(Collectors.toSet());
+        this.functionNamesWithoutInternal =
+                definitions.stream()
+                        .filter(f -> !f.isInternal())
+                        .map(BuiltInFunctionDefinition::getName)
+                        .collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<String> listFunctions() {
+        return listFunctions(false);
+    }
+
+    @Override
+    public Set<String> listFunctions(boolean includeHiddenFunctions) {
+        if (includeHiddenFunctions) {
+            return functionNamesWithInternal;
+        } else {
+            return functionNamesWithoutInternal;
+        }
+    }
+
+    @Override
+    public Optional<FunctionDefinition> getFunctionDefinition(String name) {
+        final String normalizedName = name.toUpperCase(Locale.ROOT);
+        return Optional.ofNullable(normalizedFunctions.get(normalizedName));
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModuleFactory.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModuleFactory.java
new file mode 100644
index 00000000000..759487089fb
--- /dev/null
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/module/StateModuleFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table.module;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.ModuleFactory;
+import org.apache.flink.table.module.Module;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Factory for {@link StateModule}. */
+@Internal
+public class StateModuleFactory implements ModuleFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return StateModule.IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Module createModule(Context context) {
+        return StateModule.INSTANCE;
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 730e50e185e..c5e2715f26d 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+org.apache.flink.state.table.module.StateModuleFactory
 org.apache.flink.state.table.SavepointDynamicTableSourceFactory
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
index d8d3b14e25e..a742c1e619b 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
@@ -37,7 +37,7 @@ import java.util.stream.LongStream;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Unit tests for the state SQL reader. */
+/** Unit tests for the savepoint SQL reader. */
 public class SavepointDynamicTableSourceTest {
     @Test
     @SuppressWarnings("unchecked")
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java
new file mode 100644
index 00000000000..128166d8f57
--- /dev/null
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the savepoint metadata SQL reader. */
+public class SavepointMetadataDynamicTableSourceTest {
+    @Test
+    public void testReadMetadata() throws Exception {
+        Configuration config = new Configuration();
+        config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        tEnv.executeSql("LOAD MODULE state");
+        Table table =
+                tEnv.sqlQuery("SELECT * FROM 
savepoint_metadata('src/test/resources/table-state')");
+        List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
+        result.sort(Comparator.comparing(a -> ((String) 
a.getField("operator-uid-hash"))));
+
+        assertThat(result.size()).isEqualTo(7);
+        Iterator<Row> it = result.iterator();
+        assertThat(it.next().toString())
+                .isEqualTo(
+                        "+I[2, Source: broadcast-source, broadcast-source-uid, 
3a6f51704798c4f418be51bfb6813b77, 1, 128, 0, 0, 0]");
+        assertThat(it.next().toString())
+                .isEqualTo(
+                        "+I[2, keyed-broadcast-process, 
keyed-broadcast-process-uid, 413c1d6f88ee8627fe4b8bc533b4cf1b, 2, 128, 2, 0, 
4548]");
+    }
+}

Reply via email to