Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1638#discussion_r156834294 --- Diff: streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java --- @@ -180,6 +182,70 @@ public static String close(CarbonTable table, String segmentId) } } + /** + * change the status of the segment from "streaming" to "streaming finish" + */ + public static void finishStreaming(CarbonTable carbonTable) throws Exception { + ICarbonLock lock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), + LockUsage.TABLE_STATUS_LOCK); + try { + if (lock.lockWithRetries()) { + ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), + LockUsage.STREAMING_LOCK); + try { + if (streamingLock.lockWithRetries()) { + LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); + boolean updated = false; + for (LoadMetadataDetails detail : details) { + if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { + detail.setLoadEndTime(System.currentTimeMillis()); + detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH); + updated = true; + } + } + if (updated) { + CarbonTablePath tablePath = + CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); + SegmentStatusManager.writeLoadDetailsIntoFile( + tablePath.getTableStatusFilePath(), details); + } + } else { + String msg = "Failed to finish streaming, because streaming is locked for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); + LOGGER.error(msg); + throw new Exception(msg); + } + } finally { + if (streamingLock.unlock()) { + LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable + .getDatabaseName() + "." + carbonTable.getTableName()); + } else { + LOGGER.error("Unable to unlock Table lock for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + " during streaming finished"); + } + } + } else { + String msg = "Failed to acquire table status lock of " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); + LOGGER.error(msg); + throw new Exception(msg); --- End diff -- fixed
---