[CARBONDATA-2064] Add compaction listener This closes #1847
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/54a381c2 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/54a381c2 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/54a381c2 Branch: refs/heads/branch-1.3 Commit: 54a381c27024ece07d400a4a1d36917bd3ca09f9 Parents: 1202e20 Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Tue Jan 23 15:26:26 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Feb 1 22:20:33 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 7 - .../hadoop/api/CarbonOutputCommitter.java | 32 ++-- .../sdv/generated/MergeIndexTestCase.scala | 30 ++-- .../CarbonIndexFileMergeTestCase.scala | 48 +++--- .../dataload/TestGlobalSortDataLoad.scala | 2 +- .../StandardPartitionTableLoadingTestCase.scala | 5 - .../carbondata/events/AlterTableEvents.scala | 14 +- .../spark/rdd/CarbonMergeFilesRDD.scala | 84 ---------- .../carbondata/spark/util/CommonUtil.scala | 51 ------ .../spark/rdd/CarbonDataRDDFactory.scala | 14 -- .../spark/rdd/CarbonTableCompactor.scala | 2 - .../CarbonAlterTableCompactionCommand.scala | 165 +++++++++---------- .../sql/execution/strategy/DDLStrategy.scala | 17 -- .../CarbonGetTableDetailComandTestCase.scala | 6 +- .../processing/loading/events/LoadEvents.java | 12 ++ .../processing/merger/CompactionType.java | 1 - 16 files changed, 155 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 77e8db8..7ae3034 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1478,13 +1478,6 @@ public final class CarbonCommonConstants { public static final String BITSET_PIPE_LINE_DEFAULT = "true"; - /** - * It is internal configuration and used only for test purpose. - * It will merge the carbon index files with in the segment to single segment. - */ - public static final String CARBON_MERGE_INDEX_IN_SEGMENT = "carbon.merge.index.in.segment"; - - public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true"; public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler"; /* http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 9cca1bb..555ddd2 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -25,18 +25,15 @@ import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.PartitionMapFileStore; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonSessionInfo; import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import org.apache.carbondata.events.OperationContext; import org.apache.carbondata.events.OperationListenerBus; import org.apache.carbondata.processing.loading.events.LoadEvents; @@ -126,7 +123,16 @@ public class CarbonOutputCommitter extends FileOutputCommitter { } } CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); - mergeCarbonIndexFiles(segmentPath); + if (operationContext != null) { + LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent = + new LoadEvents.LoadTableMergePartitionEvent(segmentPath); + try { + OperationListenerBus.getInstance() + .fireEvent(loadTableMergePartitionEvent, (OperationContext) operationContext); + } catch (Exception e) { + throw new IOException(e); + } + } String updateTime = context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null); String segmentsToBeDeleted = @@ -158,24 +164,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter { } /** - * Merge index files to a new single file. - */ - private void mergeCarbonIndexFiles(String segmentPath) throws IOException { - boolean mergeIndex = false; - try { - mergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)); - } catch (Exception e) { - mergeIndex = Boolean.parseBoolean( - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT); - } - if (mergeIndex) { - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath); - } - } - - /** * Update the tablestatus as fail if any fail happens. * * @param context http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala index cb0d02c..8e71257 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala @@ -29,6 +29,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter /** * Test Class for AlterTableTestCase to verify all scenerios @@ -40,34 +41,30 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { override protected def afterAll(): Unit = { sql("DROP TABLE IF EXISTS nonindexmerge") sql("DROP TABLE IF EXISTS indexmerge") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") } test("Verify correctness of index merge sdv") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql(s"""drop table if exists carbon_automation_nonmerge""").collect sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") sql("DROP TABLE IF EXISTS carbon_automation_merge") sql(s"""create table carbon_automation_merge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADP artitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect + val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), sql("""Select count(*) from carbon_automation_merge""")) } test("Verify command of index merge sdv") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql(s"""drop table if exists carbon_automation_nonmerge""").collect sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect @@ -77,17 +74,18 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect() assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") - sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect() + val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows) } test("Verify index index merge with compaction sdv") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql(s"""drop table if exists carbon_automation_nonmerge""").collect sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect @@ -99,9 +97,11 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect() + val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index c66107f..895b0b5 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -20,12 +20,11 @@ package org.apache.carbondata.spark.testsuite.datacompaction import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter class CarbonIndexFileMergeTestCase extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { @@ -40,13 +39,9 @@ class CarbonIndexFileMergeTestCase CompactionSupportGlobalSortBigFileTest.deleteFile(file2) sql("DROP TABLE IF EXISTS nonindexmerge") sql("DROP TABLE IF EXISTS indexmerge") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") } test("Verify correctness of index merge") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql("DROP TABLE IF EXISTS nonindexmerge") sql( """ @@ -57,8 +52,6 @@ class CarbonIndexFileMergeTestCase sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " + s"'GLOBAL_SORT_PARTITIONS'='100')") assert(getIndexFileCount("default_nonindexmerge", "0") == 100) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") sql("DROP TABLE IF EXISTS indexmerge") sql( """ @@ -68,14 +61,16 @@ class CarbonIndexFileMergeTestCase """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " + s"'GLOBAL_SORT_PARTITIONS'='100')") + val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) assert(getIndexFileCount("default_indexmerge", "0") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), sql("""Select count(*) from indexmerge""")) } test("Verify command of index merge") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql("DROP TABLE IF EXISTS nonindexmerge") sql( """ @@ -90,17 +85,18 @@ class CarbonIndexFileMergeTestCase val rows = sql("""Select count(*) from nonindexmerge""").collect() assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") - sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect() + val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) } test("Verify command of index merge without enabling property") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql("DROP TABLE IF EXISTS nonindexmerge") sql( """ @@ -115,15 +111,18 @@ class CarbonIndexFileMergeTestCase val rows = sql("""Select count(*) from nonindexmerge""").collect() assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) - sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect() + val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) } test("Verify index index merge with compaction") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql("DROP TABLE IF EXISTS nonindexmerge") sql( """ @@ -141,16 +140,16 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true") sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() + val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false) assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) } test("Verify index index merge for compacted segments") { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql("DROP TABLE IF EXISTS nonindexmerge") sql( """ @@ -172,7 +171,10 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "2") == 100) assert(getIndexFileCount("default_nonindexmerge", "3") == 100) sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() - sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect() + val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") + val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath) + new CarbonIndexFileMergeWriter() + .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false) assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) assert(getIndexFileCount("default_nonindexmerge", "2") == 100) http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 50a38f1..0d9e0fd 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -273,7 +273,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort") val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val segmentDir = carbonTablePath.getSegmentDir("0", "0") - assertResult(Math.max(4, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) + assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) } test("Query with small files") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 31d2598..16f252b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -319,8 +319,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } test("merge carbon index disable data loading for partition table for three partition column") { - CarbonProperties.getInstance.addProperty( - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") sql( """ | CREATE TABLE mergeindexpartitionthree (empno int, doj Timestamp, @@ -340,9 +338,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte val files = carbonFile.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName) }) - CarbonProperties.getInstance.addProperty( - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT) assert(files.length == 10) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index ca1948a..671e132 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -17,7 +17,7 @@ package org.apache.carbondata.events import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping} +import org.apache.spark.sql.execution.command._ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -203,3 +203,15 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession, carbonTable: CarbonTable, carbonMergerMapping: CarbonMergerMapping, mergedLoadName: String) extends Event with AlterTableCompactionEventInfo + + +/** + * Compaction Event for handling exception in compaction + * + * @param sparkSession + * @param carbonTable + * @param alterTableModel + */ +case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession, + carbonTable: CarbonTable, + alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala deleted file mode 100644 index 1087ea7..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.rdd - -import org.apache.spark.{Partition, SparkContext, TaskContext} - -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter - -case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentPath: String) - extends Partition { - - override val index: Int = idx - - override def hashCode(): Int = 41 * (41 + rddId) + idx -} - -/** - * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment. - * @param sc - * @param tablePath - * @param segments segments to be merged - */ -class CarbonMergeFilesRDD( - sc: SparkContext, - tablePath: String, - segments: Seq[String], - readFileFooterFromCarbonDataFile: Boolean) - extends CarbonRDD[String](sc, Nil) { - - override def getPartitions: Array[Partition] = { - segments.zipWithIndex.map {s => - CarbonMergeFilePartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1)) - }.toArray - } - - override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = { - val iter = new Iterator[String] { - val split = theSplit.asInstanceOf[CarbonMergeFilePartition] - logInfo("Merging carbon index files of segment : " + split.segmentPath) - - new CarbonIndexFileMergeWriter() - .mergeCarbonIndexFilesOfSegment(split.segmentPath, readFileFooterFromCarbonDataFile) - - var havePair = false - var finished = false - - override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = true - havePair = !finished - } - !finished - } - - override def next(): String = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - "" - } - - } - iter - } - -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index d96a051..b44a0fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -55,7 +55,6 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD object CommonUtil { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -891,54 +890,4 @@ object CommonUtil { (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim)) } - /** - * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment - * - * @param sparkContext - * @param segmentIds - * @param tablePath - * @param carbonTable - * @param mergeIndexProperty - * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata - * file. This will used in case of upgrade from version - * which do not store the blocklet info to current version - */ - def mergeIndexFiles(sparkContext: SparkContext, - segmentIds: Seq[String], - tablePath: String, - carbonTable: CarbonTable, - mergeIndexProperty: Boolean, - readFileFooterFromCarbonDataFile: Boolean = false): Unit = { - if (mergeIndexProperty) { - new CarbonMergeFilesRDD( - sparkContext, - carbonTable.getTablePath, - segmentIds, - readFileFooterFromCarbonDataFile).collect() - } else { - try { - CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT).toBoolean - if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { - new CarbonMergeFilesRDD( - sparkContext, - carbonTable.getTablePath, - segmentIds, - readFileFooterFromCarbonDataFile).collect() - } - } catch { - case _: Exception => - if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) { - new CarbonMergeFilesRDD( - sparkContext, - carbonTable.getTablePath, - segmentIds, - readFileFooterFromCarbonDataFile).collect() - } - } - } - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 3de0e70..5c43d58 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -103,18 +103,6 @@ object CarbonDataRDDFactory { LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" + s".${ carbonLoadModel.getTableName }") try { - if (compactionType == CompactionType.SEGMENT_INDEX) { - // Just launch job to merge index and return - CommonUtil.mergeIndexFiles( - sqlContext.sparkContext, - CarbonDataMergerUtil.getValidSegmentList( - carbonTable.getAbsoluteTableIdentifier).asScala, - carbonLoadModel.getTablePath, - carbonTable, - true) - lock.unlock() - return - } startCompactionThreads( sqlContext, carbonLoadModel, @@ -359,8 +347,6 @@ object CarbonDataRDDFactory { } else { loadDataFile(sqlContext, carbonLoadModel, hadoopConf) } - CommonUtil.mergeIndexFiles(sqlContext.sparkContext, - Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false) val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] if (status.nonEmpty) { status.foreach { eachLoadStatus => http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 8406d8d..bfe4e41 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -221,8 +221,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, if (finalMergeStatus) { val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) - CommonUtil.mergeIndexFiles( - sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false) new PartitionMapFileStore().mergePartitionMapFiles( CarbonTablePath.getSegmentPath(tablePath, mergedLoadNumber), carbonLoadModel.getFactTimeStamp + "") http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index fb0f9fe..2a77826 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -34,16 +34,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.spark.exception.ConcurrentOperationException +import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.streaming.StreamHandoffRDD @@ -90,52 +91,74 @@ case class CarbonAlterTableCompactionCommand( LogServiceFactory.getLogService(this.getClass.getName) val tableName = alterTableModel.tableName.toLowerCase val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) - val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table) - if (isLoadInProgress) { - val message = "Cannot run data loading and compaction on same table concurrently. " + - "Please wait for load to finish" - LOGGER.error(message) - throw new ConcurrentOperationException(message) - } - val carbonLoadModel = new CarbonLoadModel() - carbonLoadModel.setTableName(table.getTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - carbonLoadModel.setTableName(table.getTableName) - carbonLoadModel.setDatabaseName(table.getDatabaseName) - carbonLoadModel.setTablePath(table.getTablePath) - - var storeLocation = CarbonProperties.getInstance.getProperty( - CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, - System.getProperty("java.io.tmpdir")) - storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - // trigger event for compaction - val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = - AlterTableCompactionPreEvent(sparkSession, table, null, null) - OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) + operationContext.setProperty("compactionException", "true") + var compactionType: CompactionType = null + var compactionException = "true" try { - alterTableForCompaction( - sparkSession.sqlContext, - alterTableModel, - carbonLoadModel, - storeLocation, - operationContext) + compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase) } catch { - case e: Exception => - if (null != e.getMessage) { - CarbonException.analysisException( - s"Compaction failed. Please check logs for more info. ${ e.getMessage }") - } else { - CarbonException.analysisException( - "Exception in compaction. Please check logs for more info.") - } + case _: Exception => + val alterTableCompactionExceptionEvent: AlterTableCompactionExceptionEvent = + AlterTableCompactionExceptionEvent(sparkSession, table, alterTableModel) + OperationListenerBus.getInstance + .fireEvent(alterTableCompactionExceptionEvent, operationContext) + compactionException = operationContext.getProperty("compactionException").toString + } + + if (compactionException.equalsIgnoreCase("true") && null == compactionType) { + throw new MalformedCarbonCommandException( + "Unsupported alter operation on carbon table") + } else if (compactionException.equalsIgnoreCase("false")) { + Seq.empty + } else { + val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table) + if (isLoadInProgress) { + val message = "Cannot run data loading and compaction on same table concurrently. " + + "Please wait for load to finish" + LOGGER.error(message) + throw new ConcurrentOperationException(message) + } + + val carbonLoadModel = new CarbonLoadModel() + carbonLoadModel.setTableName(table.getTableName) + val dataLoadSchema = new CarbonDataLoadSchema(table) + // Need to fill dimension relation + carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) + carbonLoadModel.setTableName(table.getTableName) + carbonLoadModel.setDatabaseName(table.getDatabaseName) + carbonLoadModel.setTablePath(table.getTablePath) + + var storeLocation = CarbonProperties.getInstance.getProperty( + CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, + System.getProperty("java.io.tmpdir")) + storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() + // trigger event for compaction + val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = + AlterTableCompactionPreEvent(sparkSession, table, null, null) + OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) + try { + alterTableForCompaction( + sparkSession.sqlContext, + alterTableModel, + carbonLoadModel, + storeLocation, + operationContext) + } catch { + case e: Exception => + if (null != e.getMessage) { + CarbonException.analysisException( + s"Compaction failed. Please check logs for more info. ${ e.getMessage }") + } else { + CarbonException.analysisException( + "Exception in compaction. Please check logs for more info.") + } + } + // trigger event for compaction + val alterTableCompactionPostEvent: AlterTableCompactionPostEvent = + AlterTableCompactionPostEvent(sparkSession, table, null, null) + OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext) + Seq.empty } - // trigger event for compaction - val alterTableCompactionPostEvent: AlterTableCompactionPostEvent = - AlterTableCompactionPostEvent(sparkSession, table, null, null) - OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext) - Seq.empty } private def alterTableForCompaction(sqlContext: SQLContext, @@ -225,50 +248,14 @@ case class CarbonAlterTableCompactionCommand( LOGGER.info("Acquired the compaction lock for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") try { - if (compactionType == CompactionType.SEGMENT_INDEX) { - // Just launch job to merge index and return - CommonUtil.mergeIndexFiles( - sqlContext.sparkContext, - CarbonDataMergerUtil.getValidSegmentList( - carbonTable.getAbsoluteTableIdentifier).asScala, - carbonLoadModel.getTablePath, - carbonTable, - mergeIndexProperty = true, - readFileFooterFromCarbonDataFile = true) - - val carbonMergerMapping = CarbonMergerMapping(carbonTable.getTablePath, - carbonTable.getMetaDataFilepath, - "", - carbonTable.getDatabaseName, - carbonTable.getTableName, - Array(), - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, - compactionType, - maxSegmentColCardinality = null, - maxSegmentColumnSchemaList = null, - compactionModel.currentPartitions, - null) - - // trigger event for compaction - val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent = - AlterTableCompactionPreStatusUpdateEvent(sqlContext.sparkSession, - carbonTable, - carbonMergerMapping, - carbonLoadModel, - "") - OperationListenerBus.getInstance - .fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext) - lock.unlock() - } else { - CarbonDataRDDFactory.startCompactionThreads( - sqlContext, - carbonLoadModel, - storeLocation, - compactionModel, - lock, - operationContext - ) - } + CarbonDataRDDFactory.startCompactionThreads( + sqlContext, + carbonLoadModel, + storeLocation, + compactionModel, + lock, + operationContext + ) } catch { case e: Exception => LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index b174b94..83831e3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -100,24 +100,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { .tableExists(TableIdentifier(altertablemodel.tableName, altertablemodel.dbName))(sparkSession) if (isCarbonTable) { - var compactionType: CompactionType = null - try { - compactionType = CompactionType.valueOf(altertablemodel.compactionType.toUpperCase) - } catch { - case _: Exception => - throw new MalformedCarbonCommandException( - "Unsupported alter operation on carbon table") - } - if (CompactionType.MINOR == compactionType || - CompactionType.MAJOR == compactionType || - CompactionType.SEGMENT_INDEX == compactionType || - CompactionType.STREAMING == compactionType || - CompactionType.CLOSE_STREAMING == compactionType) { ExecutedCommandExec(alterTable) :: Nil - } else { - throw new MalformedCarbonCommandException( - "Unsupported alter operation on carbon table") - } } else { throw new MalformedCarbonCommandException( "Operation not allowed : " + altertablemodel.alterSql) http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala index 6265d0d..48733dc 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala @@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA assertResult(2)(result.length) assertResult("table_info1")(result(0).getString(0)) - // 2143 is the size of carbon table - assertResult(2143)(result(0).getLong(1)) + // 2096 is the size of carbon table + assertResult(2096)(result(0).getLong(1)) assertResult("table_info2")(result(1).getString(0)) - assertResult(2143)(result(1).getLong(1)) + assertResult(2096)(result(1).getLong(1)) } override def afterAll: Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java index 190c72c..a3fa292 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java @@ -181,4 +181,16 @@ public class LoadEvents { return carbonLoadModel; } } + + public static class LoadTableMergePartitionEvent extends Event { + private String segmentPath; + + public LoadTableMergePartitionEvent(String segmentPath) { + this.segmentPath = segmentPath; + } + + public String getSegmentPath() { + return segmentPath; + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java index 39f56a2..9ed87fc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java @@ -27,7 +27,6 @@ public enum CompactionType { MAJOR, IUD_UPDDEL_DELTA, IUD_DELETE_DELTA, - SEGMENT_INDEX, STREAMING, CLOSE_STREAMING, NONE