wwj6591812 commented on code in PR #4346:
URL: https://github.com/apache/paimon/pull/4346#discussion_r1807115312
##########
docs/content/spark/procedures.md:
##########
@@ -121,6 +121,19 @@ This section introduce all available spark procedures
about paimon.
CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1',
target_tag_name => 'tag2')
</td>
</tr>
+ <tr>
+ <td>replace_tag</td>
Review Comment:
I think update_tag may be better?
##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -576,6 +576,25 @@ public void renameTag(String tagName, String
targetTagName) {
tagManager().renameTag(tagName, targetTagName);
}
+ @Override
+ public void replaceTag(String tagName, @Nullable Duration timeRetained) {
+ Snapshot latestSnapshot = snapshotManager().latestSnapshot();
+ SnapshotNotExistException.checkNotNull(
+ latestSnapshot, "Cannot create tag because latest snapshot
doesn't exist.");
Review Comment:
Can not update tag because latest snapshot doesn't exist.
##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagTagProcedure.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagTagProcedure extends CreateOrReplaceTagBaseProcedure {
Review Comment:
UpdateTagProcedure
##########
paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java:
##########
@@ -103,7 +103,27 @@ public void createTag(
List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s'
is blank.", tagName);
+ checkArgument(!tagExists(tagName), "Tag name '%s' already exists.",
tagName);
+ createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
+ }
+
+ /** Replace a tag from given snapshot and save it in the storage. */
+ public void replaceTag(
+ Snapshot snapshot,
+ String tagName,
+ @Nullable Duration timeRetained,
+ List<TagCallback> callbacks) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s'
is blank.", tagName);
+ checkArgument(tagExists(tagName), "Tag name '%s' does not exist.",
tagName);
+ createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
+ }
+ public void createOrReplaceTag(
Review Comment:
createOrUpdateTag
##########
docs/content/spark/procedures.md:
##########
@@ -121,6 +121,19 @@ This section introduce all available spark procedures
about paimon.
CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1',
target_tag_name => 'tag2')
</td>
</tr>
+ <tr>
+ <td>replace_tag</td>
+ <td>
+ Replace an existing tag with new tag info. Arguments:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>tag: name of the existed tag. Cannot be empty.</li>
+ <li>snapshot(Long): id of the snapshot which the tag is based
on.</li>
Review Comment:
1、snapshotId
2、It is optional.
##########
paimon-core/src/main/java/org/apache/paimon/table/Table.java:
##########
@@ -120,6 +120,12 @@ default String fullName() {
@Experimental
void renameTag(String tagName, String targetTagName);
+ @Experimental
+ void replaceTag(String tagName, Duration timeRetained);
Review Comment:
接口方法【replaceTag】必须使用javadoc注释
##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -576,6 +576,25 @@ public void renameTag(String tagName, String
targetTagName) {
tagManager().renameTag(tagName, targetTagName);
}
+ @Override
+ public void replaceTag(String tagName, @Nullable Duration timeRetained) {
+ Snapshot latestSnapshot = snapshotManager().latestSnapshot();
+ SnapshotNotExistException.checkNotNull(
+ latestSnapshot, "Cannot create tag because latest snapshot
doesn't exist.");
+ tagManager()
+ .replaceTag(latestSnapshot, tagName, timeRetained,
store().createTagCallbacks());
+ }
+
+ @Override
+ public void replaceTag(String tagName, long fromSnapshotId, @Nullable
Duration timeRetained) {
+ tagManager()
+ .replaceTag(
+ findSnapshot(fromSnapshotId),
+ tagName,
+ timeRetained,
+ store().createTagCallbacks());
Review Comment:
Here you should think this case:
If user update a tag, but he don't want call createTagCallbacks.
##########
docs/content/spark/procedures.md:
##########
@@ -121,6 +121,19 @@ This section introduce all available spark procedures
about paimon.
CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1',
target_tag_name => 'tag2')
</td>
</tr>
+ <tr>
+ <td>replace_tag</td>
+ <td>
+ Replace an existing tag with new tag info. Arguments:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>tag: name of the existed tag. Cannot be empty.</li>
+ <li>snapshot(Long): id of the snapshot which the tag is based
on.</li>
+ <li>time_retained: The maximum time retained for the existing
tag.</li>
Review Comment:
2、It is optional.
##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java:
##########
@@ -18,71 +18,26 @@
package org.apache.paimon.spark.procedure;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.table.Table;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
import java.time.Duration;
-import static org.apache.spark.sql.types.DataTypes.LongType;
-import static org.apache.spark.sql.types.DataTypes.StringType;
-
/** A procedure to create a tag. */
-public class CreateTagProcedure extends BaseProcedure {
-
- private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- ProcedureParameter.required("table", StringType),
- ProcedureParameter.required("tag", StringType),
- ProcedureParameter.optional("snapshot", LongType),
- ProcedureParameter.optional("time_retained", StringType)
- };
-
- private static final StructType OUTPUT_TYPE =
- new StructType(
- new StructField[] {
- new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
- });
+public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {
protected CreateTagProcedure(TableCatalog tableCatalog) {
Review Comment:
why not private?
##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagTagProcedure.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagTagProcedure extends CreateOrReplaceTagBaseProcedure {
+
+ protected ReplaceTagTagProcedure(TableCatalog tableCatalog) {
Review Comment:
why not private?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]