This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ff64c8ca4 [hotfix] Improve spark procedures document format and 
duplicate code calls (#3716)
ff64c8ca4 is described below

commit ff64c8ca46c1e8593857eb15ef07c93045959b30
Author: Kerwin <[email protected]>
AuthorDate: Thu Jul 11 09:53:27 2024 +0800

    [hotfix] Improve spark procedures document format and duplicate code calls 
(#3716)
---
 docs/content/spark/procedures.md                   | 16 +++++-----
 .../java/org/apache/paimon/stats/Statistics.java   | 34 ++++++++++------------
 .../org/apache/paimon/stats/StatsFileHandler.java  |  2 +-
 .../apache/paimon/utils/FileStorePathFactory.java  | 10 +++----
 .../apache/paimon/utils/ScanParallelExecutor.java  |  2 +-
 5 files changed, 30 insertions(+), 34 deletions(-)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 43b82cb6c..d464881d6 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -49,8 +49,8 @@ This section introduce all available spark procedures about 
paimon.
             <li>max_concurrent_jobs: when sort compact is used, files in one 
partition are grouped and submitted as a single spark compact job. This 
parameter controls the maximum number of jobs that can be submitted 
simultaneously. The default value is 15.</li>
       </td>
       <td>
-         SET spark.sql.shuffle.partitions=10; --set the compact parallelism 
<br/>
-         CALL sys.compact(table => 'T', partitions => 'p=0;p=1',  
order_strategy => 'zorder', order_by => 'a,b') <br/>
+         SET spark.sql.shuffle.partitions=10; --set the compact parallelism 
<br/><br/>
+         CALL sys.compact(table => 'T', partitions => 'p=0;p=1',  
order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
          CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy 
=> 'zorder', order_by => 'a,b')
       </td>
     </tr>
@@ -87,7 +87,7 @@ This section introduce all available spark procedures about 
paimon.
       </td>
       <td>
          -- based on snapshot 10 with 1d <br/>
-         CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot 
=> 10, time_retained => '1 d') <br/>
+         CALL sys.create_tag(table => 'default.T', tag => 'my_tag', snapshot 
=> 10, time_retained => '1 d') <br/><br/>
          -- based on the latest snapshot <br/>
          CALL sys.create_tag(table => 'default.T', tag => 'my_tag')
       </td>
@@ -109,7 +109,7 @@ This section introduce all available spark procedures about 
paimon.
             <li>version: id of the snapshot or name of tag that will roll back 
to.</li>
       </td>
       <td>
-          CALL sys.rollback(table => 'default.T', version => 'my_tag')<br/>
+          CALL sys.rollback(table => 'default.T', version => 
'my_tag')<br/><br/>
           CALL sys.rollback(table => 'default.T', version => 10)
       </td>
     </tr>
@@ -146,9 +146,7 @@ This section introduce all available spark procedures about 
paimon.
             <li>database_or_table: empty or the target database name or the 
target table identifier, if you specify multiple tags, delimiter is ','</li>
       </td>
       <td>
-          CALL sys.repair('test_db.T')
-      </td>
-      <td>
+          CALL sys.repair('test_db.T')<br/><br/>
           CALL sys.repair('test_db.T,test_db01,test_db.T2')
       </td>
     </tr>
@@ -162,8 +160,8 @@ This section introduce all available spark procedures about 
paimon.
             <li>snapshot(Long):  id of the snapshot which the new tag is based 
on.</li>
       </td>
       <td>
-          CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch')<br/>
-          CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch', tag => 'my_tag')<br/>
+          CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch')<br/><br/>
+          CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch', tag => 'my_tag')<br/><br/>
           CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch', snapshot => 10)
       </td>
     </tr>
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
index 5d9403044..32c3699c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/Statistics.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.OptionalUtils;
@@ -38,6 +39,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.stream.Collectors;
 
 /**
  * Global stats, supports the following stats.
@@ -119,18 +121,16 @@ public class Statistics {
     public void serializeFieldsToString(TableSchema schema) {
         try {
             if (colStats != null) {
+                Map<String, DataType> fields =
+                        schema.fields().stream()
+                                .collect(Collectors.toMap(DataField::name, 
DataField::type));
                 for (Map.Entry<String, ColStats<?>> entry : 
colStats.entrySet()) {
                     String colName = entry.getKey();
                     ColStats<?> colStats = entry.getValue();
-                    DataType type =
-                            schema.fields().stream()
-                                    .filter(field -> 
field.name().equals(colName))
-                                    .findFirst()
-                                    .orElseThrow(
-                                            () ->
-                                                    new IllegalStateException(
-                                                            "Unable to obtain 
the latest schema"))
-                                    .type();
+                    DataType type = fields.get(colName);
+                    if (type == null) {
+                        throw new IllegalStateException("Unable to obtain the 
latest schema");
+                    }
                     colStats.serializeFieldsToString(type);
                 }
             }
@@ -142,18 +142,16 @@ public class Statistics {
     public void deserializeFieldsFromString(TableSchema schema) {
         try {
             if (colStats != null) {
+                Map<String, DataType> fields =
+                        schema.fields().stream()
+                                .collect(Collectors.toMap(DataField::name, 
DataField::type));
                 for (Map.Entry<String, ColStats<?>> entry : 
colStats.entrySet()) {
                     String colName = entry.getKey();
                     ColStats<?> colStats = entry.getValue();
-                    DataType type =
-                            schema.fields().stream()
-                                    .filter(field -> 
field.name().equals(colName))
-                                    .findFirst()
-                                    .orElseThrow(
-                                            () ->
-                                                    new IllegalStateException(
-                                                            "Unable to obtain 
the latest schema"))
-                                    .type();
+                    DataType type = fields.get(colName);
+                    if (type == null) {
+                        throw new IllegalStateException("Unable to obtain the 
latest schema");
+                    }
                     colStats.deserializeFieldsFromString(type);
                 }
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
index 0ca23fad5..f9e057c7c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
@@ -56,7 +56,7 @@ public class StatsFileHandler {
     public Optional<Statistics> readStats() {
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         if (latestSnapshotId == null) {
-            throw new IllegalStateException("Unable to obtain the latest 
schema");
+            throw new IllegalStateException("Unable to obtain the latest 
snapshot");
         }
         return readStats(latestSnapshotId);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 7696f9ada..a49e6dbc4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -106,11 +106,11 @@ public class FileStorePathFactory {
 
     public Path relativePartitionAndBucketPath(BinaryRow partition, int 
bucket) {
         String partitionPath = getPartitionString(partition);
-        if (partitionPath.isEmpty()) {
-            return new Path(BUCKET_PATH_PREFIX + bucket);
-        } else {
-            return new Path(getPartitionString(partition) + "/" + 
BUCKET_PATH_PREFIX + bucket);
-        }
+        String fullPath =
+                partitionPath.isEmpty()
+                        ? BUCKET_PATH_PREFIX + bucket
+                        : partitionPath + "/" + BUCKET_PATH_PREFIX + bucket;
+        return new Path(fullPath);
     }
 
     /** IMPORTANT: This method is NOT THREAD SAFE. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
index 969dbbf9d..6bf6b3517 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ScanParallelExecutor.java
@@ -45,7 +45,7 @@ public class ScanParallelExecutor {
         if (queueSize == null) {
             queueSize = COMMON_IO_FORK_JOIN_POOL.getParallelism();
         } else if (queueSize <= 0) {
-            throw new NegativeArraySizeException("queue size should not be 
negetive");
+            throw new NegativeArraySizeException("queue size should not be 
negative");
         }
 
         final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input, 
queueSize));

Reply via email to