QiangCai commented on a change in pull request #3785: URL: https://github.com/apache/carbondata/pull/3785#discussion_r456179554
########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging { case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession - if (!carbonMainTable.isStreamingSink) { - LOGGER.info(s"Merge Index request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") - val lock = CarbonLockFactory.getCarbonLockObj( - carbonMainTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + LOGGER.info(s"Merge Index request received for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonMainTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }") - val segmentsToMerge = - if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { - val validSegments = - CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock for table" + + s" ${ carbonMainTable.getDatabaseName }.${ + carbonMainTable + .getTableName + }") Review comment: combine these lines to one line ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ########## @@ -104,73 +104,80 @@ class MergeIndexEventListener extends OperationEventListener with Logging { case alterTableMergeIndexEvent: AlterTableMergeIndexEvent => val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession - if (!carbonMainTable.isStreamingSink) { - LOGGER.info(s"Merge Index request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") - val lock = CarbonLockFactory.getCarbonLockObj( - carbonMainTable.getAbsoluteTableIdentifier, - LockUsage.COMPACTION_LOCK) + LOGGER.info(s"Merge Index request received for table " + + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") + val lock = CarbonLockFactory.getCarbonLockObj( + carbonMainTable.getAbsoluteTableIdentifier, + LockUsage.COMPACTION_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the compaction lock for table" + - s" ${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }") - val segmentsToMerge = - if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { - val validSegments = - CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala - val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() - validSegments.foreach { segment => + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the compaction lock for table" + + s" ${ carbonMainTable.getDatabaseName }.${ + carbonMainTable + .getTableName + }") + val loadFolderDetailsArray = SegmentStatusManager + .readLoadMetadata(carbonMainTable.getMetadataPath) + val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, + String]() + var streamingSegment: Set[String] = Set[String]() + loadFolderDetailsArray.foreach(loadMetadataDetails => { + if (loadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { + streamingSegment += loadMetadataDetails.getLoadName + } + segmentFileNameMap + .put(loadMetadataDetails.getLoadName, + String.valueOf(loadMetadataDetails.getLoadStartTime)) + }) + val segmentsToMerge = + if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) { + val validSegments = + CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala + val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]() + validSegments.foreach { segment => + // do not add ROW_V1 format + if (!segment.getLoadMetadataDetails.getFileFormat.equals(FileFormat.ROW_V1)) { validSegmentIds += segment.getSegmentNo } - validSegmentIds - } else { - alterTableMergeIndexEvent.alterTableModel.customSegmentIds.get } - - val loadFolderDetailsArray = SegmentStatusManager - .readLoadMetadata(carbonMainTable.getMetadataPath) - val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, - String]() - loadFolderDetailsArray.foreach(loadMetadataDetails => { - segmentFileNameMap - .put(loadMetadataDetails.getLoadName, - String.valueOf(loadMetadataDetails.getLoadStartTime)) - }) - // in case of merge index file creation using Alter DDL command - // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy - // store (store <= 1.1 version) and create merge Index file as per new store so that - // old store is also upgraded to new store - val startTime = System.currentTimeMillis() - CarbonMergeFilesRDD.mergeIndexFiles( - sparkSession = sparkSession, - segmentIds = segmentsToMerge, - segmentFileNameToSegmentIdMap = segmentFileNameMap, - tablePath = carbonMainTable.getTablePath, - carbonTable = carbonMainTable, - mergeIndexProperty = true, - readFileFooterFromCarbonDataFile = true) - LOGGER.info("Total time taken for merge index " - + (System.currentTimeMillis() - startTime) + "ms") - // clear Block index Cache - MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge) - val requestMessage = "Compaction request completed for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" - LOGGER.info(requestMessage) - } else { - val lockMessage = "Not able to acquire the compaction lock for table " + - s"${ carbonMainTable.getDatabaseName }." + - s"${ carbonMainTable.getTableName}" - LOGGER.error(lockMessage) - CarbonException.analysisException( - "Table is already locked for compaction. Please try after some time.") - } - } finally { - lock.unlock() + validSegmentIds + } else { + alterTableMergeIndexEvent.alterTableModel + .customSegmentIds + .get + .filter(!streamingSegment.contains(_)) Review comment: .filterNot(streamingSegment.contains(_)) ########## File path: docs/ddl-of-carbondata.md ########## @@ -750,10 +750,6 @@ Users can specify which columns to include and exclude for local dictionary gene ALTER TABLE test_db.carbon COMPACT 'SEGMENT_INDEX' ``` - **NOTE:** - - * Merge index is not supported on streaming table. Review comment: Merge index is supported on the streaming table since 2.1. But the streaming segments don't merge index still. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org