[jira] [Updated] (CARBONDATA-4042) Insert into select and CTAS launches fewer tasks(task count limited to number of nodes in cluster) even when target table is of no_sort

2020-10-23 Thread Venugopal Reddy K (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venugopal Reddy K updated CARBONDATA-4042:
--
Description: 
*Issue:*

At present, When we do insert into table select from or create table as select 
from, we lauch one single task per node. Whereas when we do a simple select * 
from table query, tasks launched are equal to number of carbondata 
files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK). 

Thus, slows down the load performance of insert into select and ctas cases.

Refer [Community discussion regd. task 
lauch|http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html]

 

*Suggestion:*

Launch the same number of tasks as in select query for insert into select and 
ctas cases when the target table is of no-sort.

  was:
*Issue:*

At present, When we do insert into table select from or create table as select 
from, we lauch one single task per node. Whereas when we do a simple select * 
from table query, tasks launched are equal to number of carbondata 
files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK). 

Thus, slows down the load performance of insert into select and ctas cases.

Refer [Community discussion regd. task 
lauch|http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html]

 

*Suggestion:*

Lauch the same number of tasks as in select query for insert into select and 
ctas cases when the target table is of no-sort.

 

SI creationSI creation
1. DDL -> Parser -> CarbonCreateSecondaryIndexCommand do all Validations(list 
important once) acquireLockForSecondaryIndexCreation()acquire locks(compact, 
meta, dele_seg lock) preparetableInfo(prepare column schema, set positionref as 
sort,inherit local dict from main table ) for SI table & 
addIndexInfoToParentTable (create indexinfo and add to main table) 
CreateTablePreExecutionEvent(for acl work) Create SI 
table(sparksession.sql(create ...)) addIndexTableInfo, refreshTable index 
table, add indexInfo to hive metastore as Serde 
addOrModifyTableProperty(indexTableExists -> true) and refresh table(refresh 
catalog table) 2. try load, LoadDataForSecondaryIndex 1. prepare load model for 
SI 2. read table status and setinto load model 3. if loadmeta is empty, just 
return, else start load to SI 4. getValidseg, if yes go ahead, else return 5. 
prepare segmentIdToLoadStartTimeMapping, and prepare secindeModel 6. create 
exeSer based on threadpool size for parallel load of segments to SI 7. 
LoadTableSIPreExecutionEvent(ACL load events) 8. try toget seg local for all 
valid segment, if u get for all , add to valid, else add to skipped segment 9. 
start load for valid segs, update SI table status for in progress 10. if sort 
scope not global sort CarbonSecondaryIndexRDD internalGetPartitions 
prepareInputFormat and getSplits() internalCompute Sort blocks, 
prepareTaskBlockMap, prepare CarbonSecondaryIndexExecutor 
exec.processTableBlocks(prepare query model and execute query and return 
Iterator) SecondaryIndexQueryResultProcessor(prepare seg prop 
from processingQuery result) 
SecondaryIndexQueryResultProcessor.processQueryResult(init tempLoc, sort data 
rows,processResult(does sort on data in iterators) prepareRowObjectForSorting 
addRowForSorting and startSorting 
initializeFinalThreadMergerForMergeSort();,initDataHandler();readAndLoadDataFromSortTempFiles();
 write the carbon files to indextable store path WriteSegmentfile getLoadresult 
from future and make, SUccess and failed seg list if (failedSeglist not empty) 
if (isCompactionCall || !isLoadToFailedSISegments) \{ fail the SI load } else 
\{ just make markedfordelet and next load take care } } else create projections 
list including PR and create a datafram from MT loadDataUsingGlobalSort 
writeSegmentFile getLoadresult from future and make, SUccess and failed seg 
list if (failedSeglist not empty) if (isCompactionCall || 
!isLoadToFailedSISegments) \{ fail the SI load } else \{ just make 
markedfordelet and next load take care } } 11. if (successSISegments.nonEmpty 
&& !isCompactionCall) update status for in progress (can avoid this) 
mergeIndexFiles writeSegmentFile(can be avoided, shreelekya working on it) 
readTable statusfile and prepareLoad model for merge datafiles 
mergeDataFilesSISegments -> scanSegmentsAndSubmitJob -> triggerCompaction ->  
CarbonSIRebuildRDD internalGetPartitions prepareInputFormat and getSplits() 
internalCompute CarbonCompactionExecutor.processTableBlocks() close(delete old 
data files) deleteOldIndexOrMergeIndexFiles writeSegmentFile for each 
mergedSegment updateTableStatusFile readTableStatusFile 
writeLoadDetailsIntoFile(updated, new Index and datasize into tablestatus file) 

[jira] [Updated] (CARBONDATA-4042) Insert into select and CTAS launches fewer tasks(task count limited to number of nodes in cluster) even when target table is of no_sort

2020-10-23 Thread Venugopal Reddy K (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venugopal Reddy K updated CARBONDATA-4042:
--
Description: 
*Issue:*

At present, When we do insert into table select from or create table as select 
from, we lauch one single task per node. Whereas when we do a simple select * 
from table query, tasks launched are equal to number of carbondata 
files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK). 

Thus, slows down the load performance of insert into select and ctas cases.

Refer [Community discussion regd. task 
lauch|http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html]

 

*Suggestion:*

Lauch the same number of tasks as in select query for insert into select and 
ctas cases when the target table is of no-sort.

 

SI creationSI creation
1. DDL -> Parser -> CarbonCreateSecondaryIndexCommand do all Validations(list 
important once) acquireLockForSecondaryIndexCreation()acquire locks(compact, 
meta, dele_seg lock) preparetableInfo(prepare column schema, set positionref as 
sort,inherit local dict from main table ) for SI table & 
addIndexInfoToParentTable (create indexinfo and add to main table) 
CreateTablePreExecutionEvent(for acl work) Create SI 
table(sparksession.sql(create ...)) addIndexTableInfo, refreshTable index 
table, add indexInfo to hive metastore as Serde 
addOrModifyTableProperty(indexTableExists -> true) and refresh table(refresh 
catalog table) 2. try load, LoadDataForSecondaryIndex 1. prepare load model for 
SI 2. read table status and setinto load model 3. if loadmeta is empty, just 
return, else start load to SI 4. getValidseg, if yes go ahead, else return 5. 
prepare segmentIdToLoadStartTimeMapping, and prepare secindeModel 6. create 
exeSer based on threadpool size for parallel load of segments to SI 7. 
LoadTableSIPreExecutionEvent(ACL load events) 8. try toget seg local for all 
valid segment, if u get for all , add to valid, else add to skipped segment 9. 
start load for valid segs, update SI table status for in progress 10. if sort 
scope not global sort CarbonSecondaryIndexRDD internalGetPartitions 
prepareInputFormat and getSplits() internalCompute Sort blocks, 
prepareTaskBlockMap, prepare CarbonSecondaryIndexExecutor 
exec.processTableBlocks(prepare query model and execute query and return 
Iterator) SecondaryIndexQueryResultProcessor(prepare seg prop 
from processingQuery result) 
SecondaryIndexQueryResultProcessor.processQueryResult(init tempLoc, sort data 
rows,processResult(does sort on data in iterators) prepareRowObjectForSorting 
addRowForSorting and startSorting 
initializeFinalThreadMergerForMergeSort();,initDataHandler();readAndLoadDataFromSortTempFiles();
 write the carbon files to indextable store path WriteSegmentfile getLoadresult 
from future and make, SUccess and failed seg list if (failedSeglist not empty) 
if (isCompactionCall || !isLoadToFailedSISegments) \{ fail the SI load } else 
\{ just make markedfordelet and next load take care } } else create projections 
list including PR and create a datafram from MT loadDataUsingGlobalSort 
writeSegmentFile getLoadresult from future and make, SUccess and failed seg 
list if (failedSeglist not empty) if (isCompactionCall || 
!isLoadToFailedSISegments) \{ fail the SI load } else \{ just make 
markedfordelet and next load take care } } 11. if (successSISegments.nonEmpty 
&& !isCompactionCall) update status for in progress (can avoid this) 
mergeIndexFiles writeSegmentFile(can be avoided, shreelekya working on it) 
readTable statusfile and prepareLoad model for merge datafiles 
mergeDataFilesSISegments -> scanSegmentsAndSubmitJob -> triggerCompaction ->  
CarbonSIRebuildRDD internalGetPartitions prepareInputFormat and getSplits() 
internalCompute CarbonCompactionExecutor.processTableBlocks() close(delete old 
data files) deleteOldIndexOrMergeIndexFiles writeSegmentFile for each 
mergedSegment updateTableStatusFile readTableStatusFile 
writeLoadDetailsIntoFile(updated, new Index and datasize into tablestatus file) 
mergeIndexFiles for newly generated index files for merged data files If 
IndexServer enabled clear cahce else clear driver cache 12. update table staus 
for success 13. if (!isCompactionCall) { triggerPrepriming(trigger pre priming 
for SI) 14. if (failedSISegments.nonEmpty && !isCompactionCall) { update table 
status for MFD 15. if (!isCompactionCall) { LoadTableSIPostExecutionEvent 16. 
if skippedSegmentNotEmpty, isSITableEnabled to false 17. 
deleteLoadsAndUpdateMetadata 18. Relase segment locks3. if 
checkMainTableSegEqualToSISeg isSITableEnabled - true 4. 
CreateTablePostExecutionEvent 5. releaseLocks(meta, dele_seg, compact) 

 

 

 

 

 

 

 

Refresh issue1. calling 3 refresh avoid it2. check if dummy is req or not3. it 
inherits same sort 

[jira] [Updated] (CARBONDATA-4042) Insert into select and CTAS launches fewer tasks(task count limited to number of nodes in cluster) even when target table is of no_sort

2020-10-23 Thread Venugopal Reddy K (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Venugopal Reddy K updated CARBONDATA-4042:
--
Summary: Insert into select and CTAS launches fewer tasks(task count 
limited to number of nodes in cluster) even when target table is of no_sort  
(was: Insert into select and CTAS launches fewer tasks(limited to max nodes) 
even when target table is of no_sort)

> Insert into select and CTAS launches fewer tasks(task count limited to number 
> of nodes in cluster) even when target table is of no_sort
> ---
>
> Key: CARBONDATA-4042
> URL: https://issues.apache.org/jira/browse/CARBONDATA-4042
> Project: CarbonData
>  Issue Type: Improvement
>  Components: data-load, spark-integration
>Reporter: Venugopal Reddy K
>Priority: Major
>
> *Issue:*
> At present, When we do insert into table select from or create table as 
> select from, we lauch one single task per node. Whereas when we do a simple 
> select * from table query, tasks launched are equal to number of carbondata 
> files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK). 
> Thus, slows down the load performance of insert into select and ctas cases.
> Refer [Community discussion regd. task 
> lauch|http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html]
>  
> *Suggestion:*
> Lauch the same number of tasks as in select query for insert into select and 
> ctas cases when the target table is of no-sort.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)