LadyForest commented on code in PR #22821:
URL: https://github.com/apache/flink/pull/22821#discussion_r1245293108


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ProcedureNotExistException.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/** Exception for trying to operate on a procedure that doesn't exist. */
+@PublicEvolving
+public class ProcedureNotExistException extends Exception {
+    private static final String MSG = "Procedure %s does not exist in Catalog 
%s.";

Review Comment:
   Nit: add a new line here.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -505,6 +507,20 @@ void alterPartition(
     CatalogFunction getFunction(ObjectPath functionPath)
             throws FunctionNotExistException, CatalogException;
 
+    /**
+     * Get the procedure. Procedure name should be handled in a case 
insensitive way.

Review Comment:
   Nit (suggested by Grammarly)
   `Get the procedure. The procedure name should be handled in a 
case-insensitive way.`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ProcedureHint.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.annotation;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.types.inference.TypeInference;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A hint that influences the reflection-based extraction of input types and 
output types for
+ * constructing the {@link TypeInference} logic of a {@link Procedure}.
+ *
+ * <p>One or more annotations can be declared on top of a {@link Procedure} 
class or individually
+ * for each {@code call()} method for overloading function signatures. All 
hint parameters are
+ * optional. If a parameter is not defined, the default reflection-based 
extraction is used. Hint
+ * parameters defined on top of a {@link Procedure} class are inherited by all 
{@code call()}
+ * methods. The {@link Procedure} should always return T[], so the {@link 
DataTypeHint} for output

Review Comment:
   Nit: for a dev process, we may first decide on what type to return and then 
create the corresponding type hint, not vice versa.
   
   {@link DataTypeHint} should always hint output type T since the {@link 
Procedure} returns T[].



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/procedures/Procedure.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.procedures;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Base interface representing a stored procedure that can be executed by 
Flink. An stored procedure
+ * accepts zero, one, or multiple input parameters and then return the 
execution result of the
+ * stored procedure.
+ *
+ * <p>The behavior of {@link Procedure} can be defined by implements a custom 
call method. An call
+ * method must be declared publicly, not static, and named <code>call</code>. 
Call methods can also
+ * be overloaded by implementing multiple methods named <code>call</code>. 
Currently, it doesn't
+ * allows users to custom their own procedure, the customer {@link Procedure} 
can only be provided
+ * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must 
implement {@link
+ * Catalog#getProcedure(ObjectPath)}.
+ *
+ * <p>When calling a stored procedure, Flink will always pass the <code>
+ * org.apache.flink.table.procedure.ProcedureContext</code> which provides
+ * StreamExecutionEnvironment currently as the first parameter of the 
<code>call</code> method. So,
+ * the custom <code>call</code> method must accept the <code>
+ * org.apache.flink.table.procedure.ProcedureContext
+ * </code> as the first parameter, and the other parameters of the 
<code>call</code> method are the
+ * actual parameter of the stored procedure.
+ *
+ * <p>By default, input and output data types are automatically extracted 
using reflection. The
+ * input arguments are derived from one or more {@code call()} methods. If the 
reflective
+ * information is not sufficient, it can be supported and enriched with {@link 
DataTypeHint} and
+ * {@link ProcedureHint}. If it's used to hint input arguments, it should only 
hint the input

Review Comment:
   Nit: It's better to include the subject here
   ` If {@link ProcedureHint} is used to hint input arguments, it should only 
hint the input`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/procedures/Procedure.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.procedures;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Base interface representing a stored procedure that can be executed by 
Flink. An stored procedure
+ * accepts zero, one, or multiple input parameters and then return the 
execution result of the
+ * stored procedure.
+ *
+ * <p>The behavior of {@link Procedure} can be defined by implements a custom 
call method. An call
+ * method must be declared publicly, not static, and named <code>call</code>. 
Call methods can also
+ * be overloaded by implementing multiple methods named <code>call</code>. 
Currently, it doesn't
+ * allows users to custom their own procedure, the customer {@link Procedure} 
can only be provided
+ * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must 
implement {@link
+ * Catalog#getProcedure(ObjectPath)}.
+ *
+ * <p>When calling a stored procedure, Flink will always pass the <code>
+ * org.apache.flink.table.procedure.ProcedureContext</code> which provides
+ * StreamExecutionEnvironment currently as the first parameter of the 
<code>call</code> method. So,
+ * the custom <code>call</code> method must accept the <code>
+ * org.apache.flink.table.procedure.ProcedureContext
+ * </code> as the first parameter, and the other parameters of the 
<code>call</code> method are the
+ * actual parameter of the stored procedure.
+ *
+ * <p>By default, input and output data types are automatically extracted 
using reflection. The
+ * input arguments are derived from one or more {@code call()} methods. If the 
reflective
+ * information is not sufficient, it can be supported and enriched with {@link 
DataTypeHint} and
+ * {@link ProcedureHint}. If it's used to hint input arguments, it should only 
hint the input
+ * arguments that start from the second argument since the first argument is 
always <code>
+ * ProcedureContext</code> which doesn't need to be annotated with data type 
hint.
+ *
+ * <p>Note: The return type of the {@code call()} method should always be T[] 
where T can be atomic
+ * type, Row, Pojo. An atomic type will be implicitly wrapped into a row 
consisting of one field.

Review Comment:
   Nit: where T can be atomic type, Row, or Pojo.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/procedures/Procedure.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.procedures;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Base interface representing a stored procedure that can be executed by 
Flink. An stored procedure
+ * accepts zero, one, or multiple input parameters and then return the 
execution result of the
+ * stored procedure.
+ *
+ * <p>The behavior of {@link Procedure} can be defined by implements a custom 
call method. An call
+ * method must be declared publicly, not static, and named <code>call</code>. 
Call methods can also
+ * be overloaded by implementing multiple methods named <code>call</code>. 
Currently, it doesn't
+ * allows users to custom their own procedure, the customer {@link Procedure} 
can only be provided

Review Comment:
   Nit: allows users to customize their own procedure, the custom



##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/procedure/ProcedureContext.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.procedure;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** A context to provide necessary context used by stored procedure. */
+@PublicEvolving
+public interface ProcedureContext {
+
+    /**
+     * Return the StreamExecutionEnvironment where the procedure is called. 
For every procedure

Review Comment:
   Nit: (suggested by Grammarly)
   ```
   Return the {@link StreamExecutionEnvironment} where the procedure is called. 
Flink will create a new {@link StreamExecutionEnvironment} based on the current 
configuration and pass it to the procedure for every procedure call. The 
procedure can modify the passed StreamExecutionEnvironment safely inside as it 
won't be leaked outside.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/procedures/Procedure.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.procedures;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Base interface representing a stored procedure that can be executed by 
Flink. An stored procedure
+ * accepts zero, one, or multiple input parameters and then return the 
execution result of the
+ * stored procedure.
+ *
+ * <p>The behavior of {@link Procedure} can be defined by implements a custom 
call method. An call
+ * method must be declared publicly, not static, and named <code>call</code>. 
Call methods can also
+ * be overloaded by implementing multiple methods named <code>call</code>. 
Currently, it doesn't
+ * allows users to custom their own procedure, the customer {@link Procedure} 
can only be provided
+ * by {@link Catalog}. To provide {@link Procedure}, {@link Catalog} must 
implement {@link
+ * Catalog#getProcedure(ObjectPath)}.
+ *
+ * <p>When calling a stored procedure, Flink will always pass the <code>
+ * org.apache.flink.table.procedure.ProcedureContext</code> which provides
+ * StreamExecutionEnvironment currently as the first parameter of the 
<code>call</code> method. So,
+ * the custom <code>call</code> method must accept the <code>
+ * org.apache.flink.table.procedure.ProcedureContext
+ * </code> as the first parameter, and the other parameters of the 
<code>call</code> method are the
+ * actual parameter of the stored procedure.
+ *
+ * <p>By default, input and output data types are automatically extracted 
using reflection. The
+ * input arguments are derived from one or more {@code call()} methods. If the 
reflective
+ * information is not sufficient, it can be supported and enriched with {@link 
DataTypeHint} and
+ * {@link ProcedureHint}. If it's used to hint input arguments, it should only 
hint the input
+ * arguments that start from the second argument since the first argument is 
always <code>
+ * ProcedureContext</code> which doesn't need to be annotated with data type 
hint.
+ *
+ * <p>Note: The return type of the {@code call()} method should always be T[] 
where T can be atomic
+ * type, Row, Pojo. An atomic type will be implicitly wrapped into a row 
consisting of one field.
+ * Also, the {@link DataTypeHint} for output data type is used to hint T.
+ *
+ * <p>The following examples with pseudocode show how to write a stored 
procedure:
+ *
+ * <pre>{@code
+ * // a stored procedure that tries to rewrite data files for iceberg, it 
accept STRING
+ * // and return an array of explicit ROW < STRING, STRING >.
+ * class IcebergRewriteDataFilesProcedure implements Procedure {
+ *   public @DataTypeHint("ROW< rewritten_data_files_count STRING, 
added_data_files_count STRING >")
+ *          Row[] call(ProcedureContext procedureContext, String tableName) {
+ *     // plan for scanning the table to do rewriting
+ *     Table table = loadTable(tableName);
+ *     List<CombinedScanTask> combinedScanTasks = planScanTask(table);
+ *
+ *     // now, rewrite the files according to the planning task
+ *     StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+ *     DataStream<CombinedScanTask> dataStream = 
env.fromCollection(combinedScanTasks);
+ *     RowDataRewriter rowDataRewriter =
+ *         new RowDataRewriter(table(), caseSensitive(), fileIO(), 
encryptionManager());
+ *     List<DataFile> addedDataFiles;
+ *     try {
+ *       addedDataFiles = rowDataRewriter.rewriteDataForTasks(dataStream, 
parallelism);
+ *     } catch (Exception e) {
+ *       throw new RuntimeException("Rewrite data file error.", e);
+ *     }
+ *
+ *     // replace the current files
+ *     List<DataFile> currentDataFiles = combinedScanTasks.stream()
+ *             .flatMap(tasks -> 
tasks.files().stream().map(FileScanTask::file))
+ *             .collect(Collectors.toList());
+ *     replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId);
+ *
+ *     // return the result for rewriting
+ *     return new Row[] {Row.of(currentDataFiles.size(), 
addedDataFiles.size())};
+ *   }
+ * }
+ *
+ * // a stored procedure that accepts < STRING, LONG > and
+ * // return an array of STRING without datatype hint.
+ * class RollbackToSnapShotProcedure implements Procedure {
+ *   public String[] call(ProcedureContext procedureContext, String tableName, 
Long snapshot) {
+ *     Table table = loadTable(tableName);
+ *     Long previousSnapShotId = table.currentSnapshot();
+ *     table.manageSnapshots().rollbackTo(snapshotId).commit();
+ *     return new String[] {
+ *             "previous_snapshot_id: " + previousSnapShotId,
+ *             "current_snapshot_id " + snapshot
+ *     };
+ *   }
+ * }
+ * }</pre>
+ *
+ * <p>In the API, a stored procedure can be used as follows:

Review Comment:
   Nit: `In the API` sounds a little weird. Did you plan to say "Towards the 
API"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to