Re0 Beatrice created FLINK-21891: ------------------------------------ Summary: The .staging_xxx directory isn't deleted after writing data to hive table in batch mode Key: FLINK-21891 URL: https://issues.apache.org/jira/browse/FLINK-21891 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.12.0 Reporter: Re0 Beatrice
In flink 1.12.0, use Blink Planner to read data from Hbase and write the results to Hive via Flink SQL. The .staging_xxx files on HDFS: /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616074732697 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616120408195 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121007337 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121607484 /user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616206808142 I found the following code in `org.apache.flink.table.filesystem.FileSystemOutputFormat` caused the problem: {code:java} import java.io.File; @Override public void finalizeGlobal(int parallelism) { try { FileSystemCommitter committer = new FileSystemCommitter( fsFactory, msFactory, overwrite, tmpPath, partitionColumns.length); committer.commitUpToCheckpoint(CHECKPOINT_ID); } catch (Exception e) { throw new TableException("Exception in finalizeGlobal", e); } finally { new File(tmpPath.getPath()).delete(); // the error code } } {code} The code in finally code block `new File(..)` can't convert `tmpPath` to HDFS file instance, I think the following code is more correct: {code:java} fsFactory.create(tmpPath.toUri()).delete(tmpPath, true); {code} A similar code has appeared in the class of PartitionTempFileManager. -- This message was sent by Atlassian Jira (v8.3.4#803005)