jacklong319 opened a new issue, #7146:
URL: https://github.com/apache/paimon/issues/7146

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   1.3.1
   
   ### Compute Engine
   
   Hadoop: 2.7.5
   Flink: 1.19.1
   
   ### Minimal reproduce step
   
   1. Create a Paimon catalog **without** Kerberos configuration:
   `
      CREATE CATALOG paimon_catalog WITH (
          'type' = 'paimon',
          'metastore' = 'hive',
          'uri' = 'thrift://xxx:9083',
          'warehouse' = 'hdfs://nameservice1/user/hive/warehouse/paimon.db'
          -- No security.kerberos.login.keytab or 
security.kerberos.login.principal
      );
      `
   2. Write data using Flink (with checkpoint enabled):
   `   INSERT INTO paimon_catalog.database.table
      SELECT * FROM source_table;`
   3. Wait for checkpoint to trigger (or manually trigger checkpoint)
   4. Observe errors:
   TaskManager logs show InterruptedIOException during checkpoint
   Flink Web UI shows "Unable to close file because the last block does not 
have enough number of replicas"
   Checkpoint fails
   Code to reproduce:
   `Options options = new Options();
   // No security configuration provided
   SecurityConfiguration config = new SecurityConfiguration(options);
   boolean isLegal = config.isLegal();  // Returns true (incorrect)`
   
   ### What doesn't meet your expectations?
   
   - `isLegal()` should return `false` when no security configuration is 
provided
   - Security wrappers should only be created when Kerberos is actually 
configured
   - Non-Kerberos environments should use the native `FileSystem` directly 
without `doAs()` overhead
   - **Checkpoints should succeed without `InterruptedIOException` errors**
   - **HDFS files should close properly without block replica issues**
   `
        at 
org.apache.paimon.fs.hadoop.HadoopSecuredFileSystem.lambda$delete$5(HadoopSecuredFileSystem.java:109)
 ~[zrc_2635_1.0.59.jar:?]
        at java.security.AccessController.doPrivileged(Native Method) 
~[?:1.8.0_201]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_201]
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1937)
 ~[flink-shaded-hadoop-2-uber-3.3.5-infra-v2-10.0.jar:3.3.5-infra-v2-10.0]
        at 
org.apache.paimon.fs.hadoop.HadoopSecuredFileSystem.runSecuredWithIOException(HadoopSecuredFileSystem.java:155)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.fs.hadoop.HadoopSecuredFileSystem.delete(HadoopSecuredFileSystem.java:109)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.fs.hadoop.HadoopFileIO.delete(HadoopFileIO.java:209) 
~[zrc_2635_1.0.59.jar:?]
        at org.apache.paimon.fs.FileIO.deleteQuietly(FileIO.java:246) 
~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.io.SingleFileWriter.abort(SingleFileWriter.java:161) 
~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.io.RollingFileWriter.abort(RollingFileWriter.java:147) 
~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:172) 
~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.mergetree.MergeTreeWriter.flushWriteBuffer(MergeTreeWriter.java:234)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:253)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:215)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStoreWrite.java:152)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:262)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:150)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:151)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:205)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:115)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:95)
 ~[zrc_2635_1.0.59.jar:?]
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
 ~[flink-dist-1.19.1.jar:1.19.1]`
   
   ### Anything else?
   
   **My Understanding:**
   
   1. The `isLegal()` method should indicate whether a valid security 
configuration exists
   2. When no configuration is provided, it should return `false` to avoid 
unnecessary security wrappers
   3. The `ugi.doAs()` wrapper in `HadoopSecuredFileSystem` causes thread 
interruption handling issues in Hadoop 2.7.5
   4. During Flink checkpoints, thread interruption combined with `doAs()` 
leads to:
      - HDFS I/O operations timing out
      - Files not being closed properly
      - HDFS reporting insufficient block replicas
      - Checkpoint failures
   
   **My Design:**
   
   - Change the return value from `true` to `false` when no keytab/principal is 
configured
   - Add a comment explaining why we return `false`
   - This ensures backward compatibility (Kerberos environments still work 
correctly)
   - This is a minimal, safe change that fixes the root cause
   
   **POC Code:**
   
   I have tested this fix locally and confirmed:
   - Non-Kerberos environments no longer create unnecessary wrappers
   - Kerberos environments continue to work correctly
   - Checkpoint operations succeed in Hadoop 2.7.5
   - No more `InterruptedIOException` errors
   - HDFS files close properly without block replica issues
   
   **Proposed Solution:**
   
   Change line 96 in `SecurityConfiguration.java` from `return true;` to 
`return false;`:
   
   `public boolean isLegal() {
       if (StringUtils.isNullOrWhitespaceOnly(keytab)
               != StringUtils.isNullOrWhitespaceOnly(principal)) {
           return false;
       }
   
       if (!StringUtils.isNullOrWhitespaceOnly(keytab)) {
           File keytabFile = new File(keytab);
           return keytabFile.exists() && keytabFile.isFile() && 
keytabFile.canRead();
       }
   
       // Return false when no security configuration is provided
       // This prevents unnecessary security wrapper creation
       return false;  // ✅ Fixed
   }
   `
   Workaround:
   As a temporary workaround, I rolled back to Paimon 1.0 behavior by modifying 
`HadoopFileIO.createFileSystem()`:
   `protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) 
throws IOException {
       return path.getFileSystem(hadoopConf.get());
       // Removed: HadoopSecuredFileSystem.trySecureFileSystem() call
   }`
   This workaround resolves the issue but loses the security features. The 
proper fix is to correct `isLegal()` logic.
   
   **Understanding:** SecurityConfiguration.isLegal() logic flaw causes 
unnecessary security wrapper creation, leading to checkpoint failures in Hadoop 
2.7.5.
   
   **Solution:** Change line 96 from `return true;` to `return false;` when no 
keytab/principal configured.
   
   **POC:** Tested locally - checkpoints succeed, no more errors.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to