[Discussion]Presto Queries leveraging Secondary Index

2021-01-05 Thread VenuReddy
Hi all.!

At present Carbon table queries with Presto engine do not make use of
indexes(SI, Bloom etc) in query processing. Exploring feasible approaches
without query plan rewrite to make use of secondary indexes(if any
available) similar to that of existing datamap.

*
Option 1:
* Presto get splits for main table to find the suitable SI table, scan, get
the position references from SI table and return the splits for main table
accordingly.
Tentative Changes:

1. Make a new CoarseGrainIndex implementation for SI.
2. Within context of CarbondataSplitManager.getSplits() for main table, in
CarbonInputFormat.getPrunedBlocklets(), we can do prune with new
CoarseGrainIndex implementation for SI(similar to that of bloom). Inside
Prune(), Identify the best suitable SI table, Use SDK CarbonReader to scan
the identified SI table, get the position references to matching predicate.
Need to think of reading the table in multiple threads.
3. Modify the filter expression to append positionId filter with obtained
position references from SI table read.
4. In the context of CarbondataPageSource, create QueryModel with modified
filter expression.
Rest of the processing remains same as before.
*Advantages:*
1. Can avoid the query plan rewrite and yet make use of SI tables.
2. Can leverage SI with any execution engine.
*DisAdvantages:*
1. Reading SI table in the context of CarbondataSplitManager.getSplits() of
main table, possibly may degrade the query performance. Need to have enough
resource to spawn multiple threads for reading within it.

*
Option 2:
* Use Index Server to prune(enable distributed pruning).
Tentative Changes:

1. Make a new CoarseGrainIndex implementation for SI.
2. On Index Server, during getSplits() for main table, in the context of
DistributedPruneRDD.internalCompute()(i.e., on Index server executors)
within pruneIndexes() can do prune with new CoarseGrainIndex implementation
for SI(similar to that of bloom). Inside Prune(), Identify the best suitable
SI table, Use CarbonReader to read the SI table, get the position references
to matching predicate.
3. Return the extended blocklets for main table
4. Need to check how to return/transform filter expression to append
positionId filter with position references which are read from SI table from
Index Server to Driver along with pruned blocklets??
*Advantages:*
1. Can avoid the query plan rewrite and yet make use of SI tables.
*DisAdvantages:*
1. Index Server Executors memory would be occupied for SI table reading.
2. Concurrent queries may have impact as Index server is used for SI table
reading.
3. Index Server must be running.

We can introduce a new Carbon property to switch between present and the new
approach being proposed. We may consider the secondary index table storage
file format change later.

Please let me know your opinion/suggestion if we can go with Option-1 or
Option-2 or both Option 1 + 2 or any other suggestion ?


Thanks,
Venu Reddy



--
Sent from: 
http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/


Re: [Discussion] Taking the inputs for Segment Interface Refactoring

2021-01-05 Thread Ajantha Bhat
Hi all,

As per the online meeting, I have thought through the design of the
transaction manager as well.
Transaction manager can be responsible for
a. Across table transaction --> expose start transaction, commit
transaction, rollback transaction to the user/application. Commit table
status file of all table once only if in all table the current transaction
successful.
b.  Table level versioning/MVCC for time travel, internally get the
transaction id (version id) for each table-level operations (DDL/DML) and
write multiple table status files for each version for time travel and also
keep one transaction file.

However, combining transactionManger with Segment interface refactoring
work will complicate things to design and handle in one PR. So, I want to
handle step by step,
*So, to handle segment interface refactoring first, Please go through the
document attached in the previous mail (also present in JIRA) and provide
your opinion (+1) to go ahead.*

Thanks,
Ajantha


On Fri, Nov 13, 2020 at 2:43 PM Ajantha Bhat  wrote:

> Hi Everyone.
> Please find the design of refactored segment interfaces in the document
> attached. Also can check the same V3 version attached in the JIRA [
> https://issues.apache.org/jira/browse/CARBONDATA-2827]
>
> It is based on some recent discussions and the previous discussions of
> 2018
> [
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Refactor-Segment-Management-Interface-td58926.html
> ]
>
> *Note:*
> 1) As  the pre-aggreage feature is not present and MV ,SI supports
> incremental loading. so, now the previous problem of commit all child
> table status at once maybe not applicable. so, removed interfaces for that.
> 2) All these will be developed in a new module called *carbondata-acid*
> and other required module depends on it.
> 3) Once this is implemented. we can discuss the design of time travel on
> top of it. [Transaction manager implementation and writing multiple table
> status files with versioning]
>
> Please go through it and give your inputs.
>
> Thanks,
> Ajantha
>
> On Mon, Oct 19, 2020 at 9:43 AM David CaiQiang 
> wrote:
>
>> I list feature list about segment as follows before starting to re-factory
>> segment interface.
>>
>> [table related]
>> 1. get lock for table
>>lock for tablestatus
>>lock for updatedTablestatus
>> 2. get lastModifiedTime of table
>>
>> [segment related]
>> 1. segment datasource
>>datasource: file format,other datasource
>>fileformat: carbon,parquet,orc,csv..
>>catalog type: segment, external segment
>> 2. data load etl(load/insert/add_external_segment/insert_stage)
>>write segment for batch loading
>>add external segment by using external folder path for mixed file
>> formatted table
>>append streaming segment for spark structed streaming
>>insert_stage for flink writer
>> 3. data query
>>segment properties and schema
>>segment level index cache and pruning
>>cache/refresh block/blocklet index cache if needed by segment
>>read segments to a dataframe/rdd
>> 4. segment management
>>new segment id for loading/insert/add_external_segment/insert_stage
>>create global segment identifier
>>show[history]/delete segment
>> 5. stats
>>collect dataSize and indexSize of the segment
>>lastModifiedTime, start/end time, update start/end time
>>fileFormat
>>status
>> 6. segment level lock for supporting concurrent operations
>> 7. get tablestatus storage factory
>>storage solution 1): use file system by default
>>storage solution 2): use hive metastore or db
>>
>> [table status related]:
>> 1. record new LoadMetadataDetails
>>  loading/insert/compatcion start/end
>>  add external segment start/end
>>  insert stage
>>
>> 2. update LoadMetadataDetails
>>   compation
>>   update/delete
>>   drop partition
>>   delete segment
>>
>> 3. read LoadMetadataDetails
>>   list all/valid/invalid segment
>>
>> 4. backup and history
>>
>> [segment file related]
>> 1. write new segment file
>>   generate segment file name
>>  better to use new timestamp to generate new segment file name for
>> each
>> writing. avoid overwriting segment file with same name.
>>write semgent file
>>merge temp segment file
>> 2. read segment file
>>readIndexFiles
>>readIndexMergeFiles
>>getPartitionSpec
>> 3. update segment file
>>update
>>merge index
>>drop partition
>>
>> [clean files related]
>> 1. clean stale files for the successful  segment operation
>>data deletion should delay a period of time(maybe query timeout
>> interval), avoid deleting file immediately(beside of drop table/partition,
>> force clean files)
>>include data file, index file, segment file, tablestatus file
>>impact operation: mergeIndex
>> 2. clean stale files for failed segment operation immediately
>>
>>
>>
>>
>>
>> -
>> Best Regards
>> David Cai
>> --
>> Sent from:
>> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabbl

Re: [Discussion]Presto Queries leveraging Secondary Index

2021-01-05 Thread Ajantha Bhat
Hi Venu,

a. *Presto carbondata support reading bloom index*, so I want to correct
your initial statement "Presto engine do not make use of
indexes(SI, Bloom etc) in query processing"

b. Between option1 and option2 the main difference is *option1 is
multi-threaded and option2 is distributed.*
The performance of the option1 will be bad. Hence even though we need spark
index server cluster (currently presto carbondata always need spark cluster
to write carbondata) *I want to go with option2.*

c. For option2, the implementation you cannot do like bloom as we need to
read the whole SI table with filter, so suggest to make a dataframe by
querying the SI table (which calls CarbonScanRDD) and once you get the
matched blocklets, make a split for main table from that based on block
level or blocklet level task distribution.

Thanks,
Ajantha

On Tue, Jan 5, 2021 at 5:31 PM VenuReddy  wrote:

> Hi all.!
>
> At present Carbon table queries with Presto engine do not make use of
> indexes(SI, Bloom etc) in query processing. Exploring feasible approaches
> without query plan rewrite to make use of secondary indexes(if any
> available) similar to that of existing datamap.
>
> *
> Option 1:
> * Presto get splits for main table to find the suitable SI table, scan, get
> the position references from SI table and return the splits for main table
> accordingly.
> Tentative Changes:
>
> 1. Make a new CoarseGrainIndex implementation for SI.
> 2. Within context of CarbondataSplitManager.getSplits() for main table, in
> CarbonInputFormat.getPrunedBlocklets(), we can do prune with new
> CoarseGrainIndex implementation for SI(similar to that of bloom). Inside
> Prune(), Identify the best suitable SI table, Use SDK CarbonReader to scan
> the identified SI table, get the position references to matching predicate.
> Need to think of reading the table in multiple threads.
> 3. Modify the filter expression to append positionId filter with obtained
> position references from SI table read.
> 4. In the context of CarbondataPageSource, create QueryModel with modified
> filter expression.
> Rest of the processing remains same as before.
> *Advantages:*
> 1. Can avoid the query plan rewrite and yet make use of SI tables.
> 2. Can leverage SI with any execution engine.
> *DisAdvantages:*
> 1. Reading SI table in the context of CarbondataSplitManager.getSplits() of
> main table, possibly may degrade the query performance. Need to have enough
> resource to spawn multiple threads for reading within it.
>
> *
> Option 2:
> * Use Index Server to prune(enable distributed pruning).
> Tentative Changes:
>
> 1. Make a new CoarseGrainIndex implementation for SI.
> 2. On Index Server, during getSplits() for main table, in the context of
> DistributedPruneRDD.internalCompute()(i.e., on Index server executors)
> within pruneIndexes() can do prune with new CoarseGrainIndex implementation
> for SI(similar to that of bloom). Inside Prune(), Identify the best
> suitable
> SI table, Use CarbonReader to read the SI table, get the position
> references
> to matching predicate.
> 3. Return the extended blocklets for main table
> 4. Need to check how to return/transform filter expression to append
> positionId filter with position references which are read from SI table
> from
> Index Server to Driver along with pruned blocklets??
> *Advantages:*
> 1. Can avoid the query plan rewrite and yet make use of SI tables.
> *DisAdvantages:*
> 1. Index Server Executors memory would be occupied for SI table reading.
> 2. Concurrent queries may have impact as Index server is used for SI table
> reading.
> 3. Index Server must be running.
>
> We can introduce a new Carbon property to switch between present and the
> new
> approach being proposed. We may consider the secondary index table storage
> file format change later.
>
> Please let me know your opinion/suggestion if we can go with Option-1 or
> Option-2 or both Option 1 + 2 or any other suggestion ?
>
>
> Thanks,
> Venu Reddy
>
>
>
> --
> Sent from:
> http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
>