[ https://issues.apache.org/jira/browse/CARBONDATA-906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965585#comment-15965585 ]
Crabo Yang edited comment on CARBONDATA-906 at 4/12/17 9:13 AM: ---------------------------------------------------------------- 1.oozie spark-opts <spark-opts> --jars rds.importer-1.0-SNAPSHOT.jar,carbondata_2.10-1.0.0-incubating-shade-hadoop2.6.0-cdh5.7.0.jar --num-executors 12 --executor-cores 4 --executor-memory 13G --conf spark.yarn.executor.memoryOverhead=5120 --conf spark.executor.heartbeatInterval=10000000 --conf spark.network.timeout=10000000 </spark-opts> 2.create script CREATE TABLE IF NOT EXISTS dmp_trade(id STRING,buyerNick STRING,buyerAlipayNO STRING,clientType STRING,sellerNick STRING,receiverName STRING,receiverMobile STRING,receiverPhone STRING,receiverCountry STRING,receiverState STRING,receiverCity STRING,receiverDistrict STRING,receiverTown STRING,receiverAddress STRING,receiverZip STRING,status STRING,tradeFrom STRING,type STRING,stepTradeStatus STRING,shippingType STRING,title STRING,buyerMessage STRING,buyerMemo STRING,rxAuditStatus STRING,buyerEmail STRING,picPath STRING,shopPick STRING,creditCardFee STRING,markDesc STRING,sellerMemo STRING,invoiceName STRING,invoiceType STRING,tradeAttr STRING,esRange STRING,esDate STRING,osDate STRING,osRange STRING,o2oSnatchStatus STRING,market STRING,etType STRING,obs STRING,tradeOriginalJson STRING,point STRING,omniAttr STRING,omniParam STRING,identity STRING,omnichannelParam STRING,assembly STRING,tradeId BIGINT,itemId BIGINT,platFormId INT,num INT,sellerFlag INT,naSource INT,etShopId INT,forbidConsign INT,buyerFlag INT,topHold INT,nvoiceKind INT,payment STRING,price STRING,totalFee STRING,discountFee STRING,postFee STRING,stepPaidFee STRING,adjustFee STRING,buyerCodFee STRING,orderTaxFee STRING,couponFee STRING,paidCouponFee STRING,sellerRate STRING,buyerRate STRING,postGateDeclare STRING,crossBondedDeclare STRING,hasBuyerMessage STRING,hasPostFee STRING,isShShip STRING,created TIMESTAMP,payTime TIMESTAMP,modified TIMESTAMP,endTime TIMESTAMP,consignTime TIMESTAMP,estConTime TIMESTAMP) STORED BY 'carbondata'; 3.carbon.properties #################### System Configuration ################## #Mandatory. Carbon Store path carbon.storelocation=hdfs://master.nascent.com:8020/Opt/CarbonStore #Base directory for Data files carbon.ddl.base.hdfs.url=hdfs://master.nascent.com:8020/opt/data #Path where the bad records are stored carbon.badRecords.location=/opt/Carbon/Spark/badrecords #Mandatory. path to kettle home carbon.kettle.home=/usr/lib/spark/carbonlib/carbonplugins #################### Performance Configuration ################## ######## DataLoading Configuration ######## ##add by xzl carbon.load.use.batch.sort=true enable.unsafe.sort=true offheap.sort.chunk.size.inmb=1024 carbon.load.batch.sort.size.inmb=450 #File read buffer size used during sorting(in MB) :MIN=1:MAX=100 carbon.sort.file.buffer.size=10 #Rowset size exchanged between data load graph steps :MIN=500:MAX=1000000 carbon.graph.rowset.size=10000 #Number of cores to be used while data loading carbon.number.of.cores.while.loading=6 #Record count to sort and write to temp intermediate files carbon.sort.size=500000 #Algorithm for hashmap for hashkey calculation carbon.enableXXHash=true #Number of cores to be used for block sort while dataloading #carbon.number.of.cores.block.sort=7 #max level cache size upto which level cache will be loaded in memory #carbon.max.level.cache.size=-1 #enable prefetch of data during merge sort while reading data from sort temp files in data loading #carbon.merge.sort.prefetch=true ######## Compaction Configuration ######## #Number of cores to be used while compacting carbon.number.of.cores.while.compacting=8 #For minor compaction, Number of segments to be merged in stage 1, number of compacted segments to be merged in stage 2. carbon.compaction.level.threshold=4,3 #default size (in MB) for major compaction to be triggered carbon.major.compaction.size=1024 ######## Query Configuration ######## #Number of cores to be used for loading index into memory carbon.number.of.cores=8 #Number of records to be in memory while querying :MIN=100000:MAX=240000 carbon.inmemory.record.size=120000 #Improves the performance of filter query carbon.enable.quick.filter=false ##number of core to load the blocks in driver #no.of.cores.to.load.blocks.in.driver=10 #################### Extra Configuration ################## ##Timestamp format of input data used for timestamp data type. #carbon.timestamp.format=yyyy-MM-dd HH:mm:ss ######## Dataload Configuration ######## ##File write buffer size used during sorting. #carbon.sort.file.write.buffer.size=10485760 ##Locking mechanism for data loading on a table carbon.lock.type=HDFSLOCK ##Minimum no of intermediate files after which sort merged to be started. #carbon.sort.intermediate.files.limit=20 ##space reserved in percentage for writing block meta data in carbon data file #carbon.block.meta.size.reserved.percentage=10 ##csv reading buffer size. #carbon.csv.read.buffersize.byte=1048576 ##To identify and apply compression for non-high cardinality columns #high.cardinality.value=100000 ##maximum no of threads used for reading intermediate files for final merging. #carbon.merge.sort.reader.thread=3 ##Carbon blocklet size. Note: this configuration cannot be change once store is generated #carbon.blocklet.size=120000 ##number of retries to get the metadata lock for loading data to table #carbon.load.metadata.lock.retries=3 ##Minimum blocklets needed for distribution. #carbon.blockletdistribution.min.blocklet.size=10 ##Interval between the retries to get the lock #carbon.load.metadata.lock.retry.timeout.sec=5 ##Temporary store location, By default it will take System.getProperty("java.io.tmpdir") #carbon.tempstore.location=/opt/Carbon/TempStoreLoc ##data loading records count logger #carbon.load.log.counter=500000 ######## Compaction Configuration ######## ##to specify number of segments to be preserved from compaction #carbon.numberof.preserve.segments=0 ##To determine the loads of number of days to be compacted #carbon.allowed.compaction.days=0 ##To enable compaction while data loading #carbon.enable.auto.load.merge=false ######## Query Configuration ######## ##Maximum time allowed for one query to be executed. max.query.execution.time=60 ##Min max is feature added to enhance query performance. To disable this feature, make it false. carbon.enableMinMax=true ######## Global Dictionary Configurations ######## ##To enable/disable identify high cardinality during first data loading #high.cardinality.identify.enable=true ##threshold to identify whether high cardinality column #high.cardinality.threshold=1000000 ##Percentage to identify whether column cardinality is more than configured percent of total row count #high.cardinality.row.count.percentage=80 ##The property to set the date to be considered as start date for calculating the timestamp. #carbon.cutOffTimestamp=2000-01-01 00:00:00 ##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or DAY level. #carbon.timegranularity=SECOND was (Author: crabo): 1.oozie spark-opts <spark-opts> --jars rds.importer-1.0-SNAPSHOT.jar,carbondata_2.10-1.0.0-incubating-shade-hadoop2.6.0-cdh5.7.0.jar --num-executors 12 --executor-cores 4 --executor-memory 13G --conf spark.yarn.executor.memoryOverhead=5120 --conf spark.executor.heartbeatInterval=10000000 --conf spark.network.timeout=10000000 </spark-opts> 2.create script CREATE TABLE IF NOT EXISTS dmp_trade(id STRING,buyerNick STRING,buyerAlipayNO STRING,clientType STRING,sellerNick STRING,receiverName STRING,receiverMobile STRING,receiverPhone STRING,receiverCountry STRING,receiverState STRING,receiverCity STRING,receiverDistrict STRING,receiverTown STRING,receiverAddress STRING,receiverZip STRING,status STRING,tradeFrom STRING,type STRING,stepTradeStatus STRING,shippingType STRING,title STRING,buyerMessage STRING,buyerMemo STRING,rxAuditStatus STRING,buyerEmail STRING,picPath STRING,shopPick STRING,creditCardFee STRING,markDesc STRING,sellerMemo STRING,invoiceName STRING,invoiceType STRING,tradeAttr STRING,esRange STRING,esDate STRING,osDate STRING,osRange STRING,o2oSnatchStatus STRING,market STRING,etType STRING,obs STRING,tradeOriginalJson STRING,point STRING,omniAttr STRING,omniParam STRING,identity STRING,omnichannelParam STRING,assembly STRING,tradeId BIGINT,itemId BIGINT,platFormId INT,num INT,sellerFlag INT,naSource INT,etShopId INT,forbidConsign INT,buyerFlag INT,topHold INT,nvoiceKind INT,payment STRING,price STRING,totalFee STRING,discountFee STRING,postFee STRING,stepPaidFee STRING,adjustFee STRING,buyerCodFee STRING,orderTaxFee STRING,couponFee STRING,paidCouponFee STRING,sellerRate STRING,buyerRate STRING,postGateDeclare STRING,crossBondedDeclare STRING,hasBuyerMessage STRING,hasPostFee STRING,isShShip STRING,created TIMESTAMP,payTime TIMESTAMP,modified TIMESTAMP,endTime TIMESTAMP,consignTime TIMESTAMP,estConTime TIMESTAMP) STORED BY 'carbondata'; 3.carbon.properties #################### System Configuration ################## #Mandatory. Carbon Store path carbon.storelocation=hdfs://master.nascent.com:8020/Opt/CarbonStore #Base directory for Data files carbon.ddl.base.hdfs.url=hdfs://master.nascent.com:8020/opt/data #Path where the bad records are stored carbon.badRecords.location=/opt/Carbon/Spark/badrecords #Mandatory. path to kettle home carbon.kettle.home=/usr/lib/spark/carbonlib/carbonplugins #################### Performance Configuration ################## ######## DataLoading Configuration ######## ##add by xzl carbon.load.use.batch.sort=true enable.unsafe.sort=true offheap.sort.chunk.size.inmb=1024 carbon.load.batch.sort.size.inmb=450 #File read buffer size used during sorting(in MB) :MIN=1:MAX=100 carbon.sort.file.buffer.size=10 #Rowset size exchanged between data load graph steps :MIN=500:MAX=1000000 carbon.graph.rowset.size=10000 #Number of cores to be used while data loading carbon.number.of.cores.while.loading=6 #Record count to sort and write to temp intermediate files carbon.sort.size=500000 #Algorithm for hashmap for hashkey calculation carbon.enableXXHash=true #Number of cores to be used for block sort while dataloading #carbon.number.of.cores.block.sort=7 #max level cache size upto which level cache will be loaded in memory #carbon.max.level.cache.size=-1 #enable prefetch of data during merge sort while reading data from sort temp files in data loading #carbon.merge.sort.prefetch=true ######## Compaction Configuration ######## #Number of cores to be used while compacting carbon.number.of.cores.while.compacting=8 #For minor compaction, Number of segments to be merged in stage 1, number of compacted segments to be merged in stage 2. carbon.compaction.level.threshold=4,3 #default size (in MB) for major compaction to be triggered carbon.major.compaction.size=1024 ######## Query Configuration ######## #Number of cores to be used for loading index into memory carbon.number.of.cores=8 #Number of records to be in memory while querying :MIN=100000:MAX=240000 carbon.inmemory.record.size=120000 #Improves the performance of filter query carbon.enable.quick.filter=false ##number of core to load the blocks in driver #no.of.cores.to.load.blocks.in.driver=10 #################### Extra Configuration ################## ##Timestamp format of input data used for timestamp data type. #carbon.timestamp.format=yyyy-MM-dd HH:mm:ss ######## Dataload Configuration ######## ##File write buffer size used during sorting. #carbon.sort.file.write.buffer.size=10485760 ##Locking mechanism for data loading on a table carbon.lock.type=HDFSLOCK ##Minimum no of intermediate files after which sort merged to be started. #carbon.sort.intermediate.files.limit=20 ##space reserved in percentage for writing block meta data in carbon data file #carbon.block.meta.size.reserved.percentage=10 ##csv reading buffer size. #carbon.csv.read.buffersize.byte=1048576 ##To identify and apply compression for non-high cardinality columns #high.cardinality.value=100000 ##maximum no of threads used for reading intermediate files for final merging. #carbon.merge.sort.reader.thread=3 ##Carbon blocklet size. Note: this configuration cannot be change once store is generated #carbon.blocklet.size=120000 ##number of retries to get the metadata lock for loading data to table #carbon.load.metadata.lock.retries=3 ##Minimum blocklets needed for distribution. #carbon.blockletdistribution.min.blocklet.size=10 ##Interval between the retries to get the lock #carbon.load.metadata.lock.retry.timeout.sec=5 ##Temporary store location, By default it will take System.getProperty("java.io.tmpdir") #carbon.tempstore.location=/opt/Carbon/TempStoreLoc ##data loading records count logger #carbon.load.log.counter=500000 ######## Compaction Configuration ######## ##to specify number of segments to be preserved from compaction #carbon.numberof.preserve.segments=0 ##To determine the loads of number of days to be compacted #carbon.allowed.compaction.days=0 ##To enable compaction while data loading #carbon.enable.auto.load.merge=false ######## Query Configuration ######## ##Maximum time allowed for one query to be executed. max.query.execution.time=60 ##Min max is feature added to enhance query performance. To disable this feature, make it false. carbon.enableMinMax=true ######## Global Dictionary Configurations ######## ##To enable/disable identify high cardinality during first data loading #high.cardinality.identify.enable=true ##threshold to identify whether high cardinality column #high.cardinality.threshold=1000000 ##Percentage to identify whether column cardinality is more than configured percent of total row count #high.cardinality.row.count.percentage=80 ##The property to set the date to be considered as start date for calculating the timestamp. #carbon.cutOffTimestamp=2000-01-01 00:00:00 ##The property to set the timestamp (ie milis) conversion to the SECOND, MINUTE, HOUR or DAY level. #carbon.timegranularity=SECOND > Always OOM error when import large dataset (100milion rows) > ----------------------------------------------------------- > > Key: CARBONDATA-906 > URL: https://issues.apache.org/jira/browse/CARBONDATA-906 > Project: CarbonData > Issue Type: Bug > Components: data-load > Affects Versions: 1.0.0-incubating > Reporter: Crabo Yang > Attachments: carbon.properties > > > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > java.util.concurrent.ConcurrentHashMap$Segment.put(ConcurrentHashMap.java:457) > at > java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1130) > at > org.apache.carbondata.core.cache.dictionary.ColumnReverseDictionaryInfo.addDataToDictionaryMap(ColumnReverseDictionaryInfo.java:101) > at > org.apache.carbondata.core.cache.dictionary.ColumnReverseDictionaryInfo.addDictionaryChunk(ColumnReverseDictionaryInfo.java:88) > at > org.apache.carbondata.core.cache.dictionary.DictionaryCacheLoaderImpl.fillDictionaryValuesAndAddToDictionaryChunks(DictionaryCacheLoaderImpl.java:113) > at > org.apache.carbondata.core.cache.dictionary.DictionaryCacheLoaderImpl.load(DictionaryCacheLoaderImpl.java:81) > at > org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCache.loadDictionaryData(AbstractDictionaryCache.java:236) > at > org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCache.checkAndLoadDictionaryData(AbstractDictionaryCache.java:186) > at > org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.getDictionary(ReverseDictionaryCache.java:174) > at > org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.get(ReverseDictionaryCache.java:67) > at > org.apache.carbondata.core.cache.dictionary.ReverseDictionaryCache.get(ReverseDictionaryCache.java:38) > at > org.apache.carbondata.processing.newflow.converter.impl.DictionaryFieldConverterImpl.<init>(DictionaryFieldConverterImpl.java:92) > at > org.apache.carbondata.processing.newflow.converter.impl.FieldEncoderFactory.createFieldEncoder(FieldEncoderFactory.java:77) > at > org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl.initialize(RowConverterImpl.java:102) > at > org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl.initialize(DataConverterProcessorStepImpl.java:69) > at > org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.initialize(SortProcessorStepImpl.java:57) > at > org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.initialize(DataWriterProcessorStepImpl.java:79) > at > org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:45) > at > org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:425) > at > org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.compute(NewCarbonDataLoadRDD.scala:383) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)