Hi community,

More idea on the loading process, in my opinion the ideal one should be as 
following, please comment:

1. input step: 
- Do the parsing of input data, either CSV or Dataframe, they all convert into 
CarbonRow.
- Buffering them to CarbonRowBatch 
- Prefetchiong of rows

2. convert step:
- Convert fields based on the category of columns
  - for global dictionary encoded columns, convert them into dictionary id.
  - for primitive type columns, just keep as origin type. (no conversion)
  - for string and variable length data type columns (like complex columns), 
convert to LV (length-value) encoded byte array

- Convert input row to columnar layout as we read from previous step. Create a 
ColumnPage for one column, and a DataPage for all columns.

- The DataPage should store the key columns specified by SORT_COLUMNS together 
in one vector, so that it will be efficient for CPU cache while sorting

3. index-build step:
- Based on configuration, there can be index-build step or without this step if 
SORT_COLUMNS is empty, which indicating no sort.

- Add the DataPage into a Sorter, the sorter sorts the incoming DataPage on the 
fly. The sort result is array of rowId(pageId + offset). Avoid using 
Arrays.sort in JDK.

- The sorter should support batch sort or merge sort. For merge sort, muliple 
spilling strategy can be implemented.

4. write step:
- Calculate the start/end key of MDKey over the ColumnPage

- Calculate the statistics of the ColumnPage. Currently in carbondata min/max 
stats is hard coded. As a first step of extension, we want to provide more 
stats like histogram and bloom filter so that user can choose from. Optional 
field need to be added in thrift definition in DataChunk struct

- Encode and compress each column into byte array. To make encoding more 
trasparent to the user and extensible, we should first decouple the data type 
with the available codec options. So in the code can choose codec for specific 
data type by some strategy. For the strategy, we can firstly have a default one 
as beginning, later on we can open this interface for developer to extension in 
the future. (say, developer can choose codec based on some hueristic rules 
based on the data distribution)

- After each column is encoded and compressed, follow the current 
Producer-Consumer logic to write to disk.


As the bottlenet will be the merge sort, some thought on the merging strategy:
Merge sort strategy:
1. spill both key columns and non-key columns in sorted order
2. spill key columns in sorted order and non-key columns in original order, and 
spill rowId also. Apply rowId mapping only in merger stage. This can avoid some 
random memory access on non-key columns. It is good if there are many non-key 
columns
3. do not spill key columns, and spill non-key columns in origin order. This is 
good as we can keep more key columns in memory


Regards,
Jacky


> 在 2017年4月22日,下午8:08,Liang Chen <chenliang6...@gmail.com> 写道:
> 
> Jacky, thank you list these constructive improvements of data loading.
> 
> Agree to consider all these improvement points, only the below one i have
> some concerns.
> Before considering open interfaces for data loading, we need to more
> clearly define block/blocklet/page which play what different roles, then we
> could consider some interfaces for block level, some interfaces for
> blocklet level, some for page level.
> 
> Let us take up these improvements in the coming release.
> --------------------------------------------------------------------------------------------------------------------
> improvement 3: we want to open up some interfaces for letting developer to
> add more page encoding, statistics, page compression. These interface will
> be like callbacks, so developer can write new encoding/statistics/compression
> method and carbon loading process will invoke it in this step. This
> interface will be like:
> 
> Regards
> Liang
> 
> 2017-04-21 14:50 GMT+08:00 Jacky Li <jacky.li...@qq.com>:
> 
>> I want to propose following improvement on data loading process:
>> 
>> Currently different steps are using different data layout in CarbonRow,
>> and it convert back and forth in different steps. It is not easy for
>> developer to understand the data structure used in each steps and it
>> increase the memory requirement as it is doing unnecessary data copying in
>> some steps. So, suggest to improve it as following
>> 
>>   1) input step: read input and create a CarbonRow with all fields are
>> string type
>> 
>>   2) convert step: convert string to byte[] according to its data type,
>> this step has compression effect of the input row so it is good for saving
>> memory and it also take cares of the null value
>>        if it is dictionary dimension then convert to surrogate key;
>>        if it is no dictionary then convert to byte[] representation;
>>        if it is complex dimension, then convert to byte[] representation;
>>        if it is measure then convert to Object, like Integer, Long,
>> Double, according to schema  —> change to byte[] instead of storing Object
>> to avoid boxing/unboxing and save memory
>> 
>>        The conversion is happened by updating the field in CarbonRow in
>> place, there should be no new CarbonRow created, however, there is a copy
>> operation of the input CarbonRow, for bad record handling  —>  do not copy
>> the row, convert it back to value if it is bad record
>> 
>>   3) sort step:
>>      improvement 1: sort the collected input CarbonRow. Currently this is
>> done by copying the row object into internal buffer and sort is done on the
>> buffer. —> to avoid the copying of the CarbonRow, we should create this
>> buffer (with RowID) in input step, and only output the sorted RowID (by
>> swapping its value in the RowID array) according to its value. If it is a
>> merge sort, then write to file based on this sorted RowID array when
>> spilling to disk. So no copy of CarbonRow is required.
>> 
>>     improvement 2: when spilling to disk, currently it changes the field
>> order in CarbonRow, it is writing as a 3-elements array, [global dictionary
>> dimension, plain dimension and complex dimension, measure columns] , this
>> is because the merger is expecting it like this —> I think this is
>> unnecessary, we can add serialization/deserialization capability in
>> CarbonRow and use CarbonRow instead. In the case of no-sort table, it also
>> avoid this conversion in write step.
>> 
>>   4) write step:
>>      currently it will collect one page of data (32K rows) and start a
>> Producer which actually is the encode process of one page. In order to
>> support parallel processing, after the page data is encoded then put it to
>> a queue which will be taken by the Consumer, the Consumer will collect
>> pages up to one blocklet size (configurable, say 64MB), and write to
>> CarbonData files.
>> 
>>      improvement 1: there is an unnecessary data copy and re-ordering of
>> the fields of the row. it converts the row to: [measure columns, plain
>> dimension and complex dimension,  global dictionary dimension] it is
>> different from what sort step outputs. —> so suggest to use CarbonRow only.
>> no new row object should be created here.
>> 
>>      improvement 2: there are multiple traversal of the page data in the
>> code currently —> we should change to, firstly convert the CarbonRow to
>> ColumnarPage which is the columnar representation for all columns in one
>> page, and collect the start/end key and statistics when doing this columnar
>> conversion. Then apply inverted index, RLE, compression process based on
>> ColumnarPage  object.
>> 
>>      improvement 3: we want to open up some interfaces for letting
>> developer to add more page encoding, statistics, page compression. These
>> interface will be like callbacks, so developer can write new
>> encoding/statistics/compression method and carbon loading process will
>> invoke it in this step. This interface will be like:
>> 
>> /**
>> *  Codec for a column page, implementation should not keep state across
>> pages,
>> *  caller will use the same object to encode multiple pages
>> */
>> interface PageCodec {
>>  /** Codec name will be stored in BlockletHeader (DataChunk3) */
>>  String getName();
>>  void startPage(int pageID);
>>  void processColumn(ColumnBatch batch);
>>  byte[] endPage(int pageID);
>>  ColumnBatch decode(byte[] encoded);
>> }
>> 
>> /** Compressor of the page data, the flow is encode->compress, and
>> decompress->decode */
>> interface PageCompressor {
>>  /** Codec name will be stored in BlockletHeader (DataChunk3) */
>>  String getName();
>>  byte[] compress(byte[] encodedData);
>>  byte[] decompress(byte[] data);
>> }
>> 
>> /** Calculate the statistics for a column page and blocklet */
>> interface Statistics {
>>  /** Codec name will be stored in BlockletHeader (DataChunk3) */
>>  String getName();
>>  void startPage(int pageID);
>>  void endPage(int pageID);
>>  void startBlocklet(int blockletID);
>>  void endBlocklet(int blockletID);
>> 
>>  /** Update the stats for the input batch */
>>  void update(ColumnBatch batch);
>> 
>>  /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) */
>>  int[] getPageStatistisc();
>> 
>>  /** Output will be written to Footer */
>>  int[] getBlockletStatistics();
>> }
>> 
>> And, there should be a partition step adding somewhere to support
>> partition feature (CARBONDATA-910), and it depends on whether we implement
>> this partition shuffling in spark layer or carbon layer. (before input step
>> or after conversion step). What is the current idea of this? @CaiQiang
>> @Lionel
>> 
>> What you think about these improvements?
>> 
>> Regards,
>> Jacky
>> 
>> 
>> 
>> 
>> 
> 
> 
> -- 
> Regards
> Liang
> 



Reply via email to