Roc Marshal created HUDI-1737:
---------------------------------

             Summary: Extract public method in HoodieCreateHandle & 
FlinkCreateHandle
                 Key: HUDI-1737
                 URL: https://issues.apache.org/jira/browse/HUDI-1737
             Project: Apache Hudi
          Issue Type: Improvement
          Components: Code Cleanup
            Reporter: Roc Marshal
            Assignee: Roc Marshal


{code:java}
// HoodieCreateHandle.java
//...
@Override
public List<WriteStatus> close() {
  LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done 
with all the records " + recordsWritten);
  try {

    fileWriter.close();

    HoodieWriteStat stat = new HoodieWriteStat();
    stat.setPartitionPath(writeStatus.getPartitionPath());
    stat.setNumWrites(recordsWritten);
    stat.setNumDeletes(recordsDeleted);
    stat.setNumInserts(insertRecordsWritten);
    stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
    stat.setFileId(writeStatus.getFileId());
    stat.setPath(new Path(config.getBasePath()), path);
    long fileSizeInBytes = FSUtils.getFileSize(fs, path);
    stat.setTotalWriteBytes(fileSizeInBytes);
    stat.setFileSizeInBytes(fileSizeInBytes);
    stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
    RuntimeStats runtimeStats = new RuntimeStats();
    runtimeStats.setTotalCreateTime(timer.endTimer());
    stat.setRuntimeStats(runtimeStats);
    writeStatus.setStat(stat);

    LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took 
%d ms.", stat.getPartitionPath(),
        stat.getFileId(), runtimeStats.getTotalCreateTime()));

    return Collections.singletonList(writeStatus);
  } catch (IOException e) {
    throw new HoodieInsertException("Failed to close the Insert Handle for path 
" + path, e);
  }
}


//FlinkCreateHandle.java
private void setUpWriteStatus() throws IOException {
  long fileSizeInBytes = fileWriter.getBytesWritten();
  long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
  this.lastFileSize = fileSizeInBytes;
  HoodieWriteStat stat = new HoodieWriteStat();
  stat.setPartitionPath(writeStatus.getPartitionPath());
  stat.setNumWrites(recordsWritten);
  stat.setNumDeletes(recordsDeleted);
  stat.setNumInserts(insertRecordsWritten);
  stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
  stat.setFileId(writeStatus.getFileId());
  stat.setPath(new Path(config.getBasePath()), path);
  stat.setTotalWriteBytes(incFileSizeInBytes);
  stat.setFileSizeInBytes(fileSizeInBytes);
  stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
  HoodieWriteStat.RuntimeStats runtimeStats = new 
HoodieWriteStat.RuntimeStats();
  runtimeStats.setTotalCreateTime(timer.endTimer());
  stat.setRuntimeStats(runtimeStats);
  writeStatus.setStat(stat);
}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to