Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1638#discussion_r156834294
  
    --- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 ---
    @@ -180,6 +182,70 @@ public static String close(CarbonTable table, String 
segmentId)
         }
       }
     
    +  /**
    +   * change the status of the segment from "streaming" to "streaming 
finish"
    +   */
    +  public static void finishStreaming(CarbonTable carbonTable) throws 
Exception {
    +    ICarbonLock lock = CarbonLockFactory.getCarbonLockObj(
    +        carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
    +        LockUsage.TABLE_STATUS_LOCK);
    +    try {
    +      if (lock.lockWithRetries()) {
    +        ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
    +            
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
    +            LockUsage.STREAMING_LOCK);
    +        try {
    +          if (streamingLock.lockWithRetries()) {
    +            LoadMetadataDetails[] details =
    +                
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
    +            boolean updated = false;
    +            for (LoadMetadataDetails detail : details) {
    +              if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
    +                detail.setLoadEndTime(System.currentTimeMillis());
    +                detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
    +                updated = true;
    +              }
    +            }
    +            if (updated) {
    +              CarbonTablePath tablePath =
    +                  
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
    +              SegmentStatusManager.writeLoadDetailsIntoFile(
    +                  tablePath.getTableStatusFilePath(), details);
    +            }
    +          } else {
    +            String msg = "Failed to finish streaming, because streaming is 
locked for table " +
    +                carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName();
    +            LOGGER.error(msg);
    +            throw new Exception(msg);
    +          }
    +        } finally {
    +          if (streamingLock.unlock()) {
    +            LOGGER.info("Table unlocked successfully after streaming 
finished" + carbonTable
    +                .getDatabaseName() + "." + carbonTable.getTableName());
    +          } else {
    +            LOGGER.error("Unable to unlock Table lock for table " +
    +                carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName() +
    +                " during streaming finished");
    +          }
    +        }
    +      } else {
    +        String msg = "Failed to acquire table status lock of " +
    +            carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName();
    +        LOGGER.error(msg);
    +        throw new Exception(msg);
    --- End diff --
    
    fixed


---

Reply via email to