[ 
https://issues.apache.org/jira/browse/HIVE-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elliot West updated HIVE-10165:
-------------------------------
    Description: 
h3. Overview
I'd like to extend the 
[hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
 API so that it also supports the writing of record updates and deletes in 
addition to the already supported inserts.

h3. Motivation
We have many Hadoop processes outside of Hive that merge changed facts into 
existing datasets. Traditionally we achieve this by: reading in a ground-truth 
dataset and a modified dataset, grouping by a key, sorting by a sequence and 
then applying a function to determine inserted, updated, and deleted rows. 
However, in our current scheme we must rewrite all partitions that may 
potentially contain changes. In practice the number of mutated records is very 
small when compared with the records contained in a partition. This approach 
results in a number of operational issues:
* Excessive amount of write activity required for small data changes.
* Downstream applications cannot robustly read these datasets while they are 
being updated.
* Due to scale of the updates (hundreds or partitions) the scope for contention 
is high. 

I believe we can address this problem by instead writing only the changed 
records to a Hive transactional table. This should drastically reduce the 
amount of data that we need to write and also provide a means for managing 
concurrent access to the data. Our existing merge processes can read and retain 
each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to an 
updated form of the hive-hcatalog-streaming API which will then have the 
required data to perform an update or insert in a transactional manner. 

h3. Benefits
* Enables the creation of large-scale dataset merge processes  
* Opens up Hive transactional functionality in an accessible manner to 
processes that operate outside of Hive.


  was:
h3. Overview
I'd like to extend the 
[hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
 API so that it also supports the writing of record updates and deletes in 
addition to the already supported inserts.

h3. Motivation
We have many Hadoop processes outside of Hive that merge changed facts into 
existing datasets. Traditionally we achieve this by: reading in a ground-truth 
dataset and a modified dataset, grouping by a key, sorting by a sequence and 
then applying a function to determine inserted, updated, and deleted rows. 
However, in our current scheme we must rewrite all partitions that may 
potentially contain changes. In practice the number of mutated records is very 
small when compared with the records contained in a partition. This approach 
results in a number of operational issues:
* Excessive amount of write activity required for small data changes.
* Downstream applications cannot robustly read these datasets while they are 
being updated.
* Due to scale of the updates (hundreds or partitions) the scope for contention 
is high. 

I believe we can address this problem by instead writing only the changed 
records to a Hive transactional table. This should drastically reduce the 
amount of data that we need to write and also provide a means for managing 
concurrent access to the data. Our existing merge processes can read and retain 
each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to an 
updated form of the hive-hcatalog-streaming API which will then have the 
required data to perform an update or insert in a transactional manner. 

h3. Benefits
* Enables the creation of large-scale dataset merge processes  
* Opens up Hive transactional functionality in an accessible manner to 
processes that operate outside of Hive.

h3. Implementation
Our changes do not break the existing API contracts. Instead our approach has 
been to consider the functionality offered by the existing API and our proposed 
API as fulfilling separate and distinct use-cases. The existing API is 
primarily focused on the task of continuously writing large volumes of new data 
into a Hive table for near-immediate analysis. Our use-case however, is 
concerned more with the frequent but not continuous ingestion of mutations to a 
Hive table from some ETL merge process. Consequently we feel it is justifiable 
to add our new functionality via an alternative set of public interfaces and 
leave the existing API as is. This keeps both APIs clean and focused at the 
expense of presenting additional options to potential users. Wherever possible, 
shared implementation concerns have been factored out into abstract base 
classes that are open to third-party extension. A detailed breakdown of the 
changes is as follows:

* We've introduced a public {{RecordMutator}} interface whose purpose is to 
expose insert/update/delete operations to the user. This is a counterpart to 
the write-only {{RecordWriter}}. We've also factored out life-cycle methods 
common to these two interfaces into a super {{RecordOperationWriter}} 
interface.  Note that the row representation has be changed from {{byte[]}} to 
{{Object}}. Within our data processing jobs our records are often available in 
a strongly typed and decoded form such as a POJO or a Tuple object. Therefore 
is seems to make sense that we are able to pass this through to the 
{{OrcRecordUpdater}} without having to go through a {{byte[]}} encoding step. 
This of course still allows users to use {{byte[]}} if they wish.
* The introduction of {{RecordMutator}} requires that insert/update/delete 
operations are then also exposed on a {{TransactionBatch}} type. We've done 
this with the introduction of a public {{MutatorTransactionBatch}} interface 
which is a counterpart to the write-only {{TransactionBatch}}. We've also 
factored out life-cycle methods common to these two interfaces into a super 
{{BaseTransactionBatch}} interface. 
* Functionality that would be shared by implementations of both 
{{RecordWriters}} and {{RecordMutators}} has been factored out of 
{{AbstractRecordWriter}} into a new abstract base class 
{{AbstractOperationRecordWriter}}. The visibility is such that it is open to 
extension by third parties. The {{AbstractOperationRecordWriter}} also permits 
the setting of the {{AcidOutputFormat.Options#recordIdColumn()}} (defaulted to 
{{-1}}) which is a requirement for enabling updates and deletes. Additionally, 
these options are now fed an {{ObjectInspector}} via an abstract method so that 
a {{SerDe}} is not mandated (it was not required for our use-case). The 
{{AbstractRecordWriter}} is now much leaner, handling only the extraction of 
the {{ObjectInspector}} from the {{SerDe}}.
* A new abstract class, {{AbstractRecordMutator}} has been introduced to act as 
the base of concrete {{RecordMutator}} implementations. The key functionality 
added by this class is a validation step on {{update}} and {{delete}} to ensure 
that the record specified contains a {{RecordIdentifier}}. This was added as it 
is not explicitly checked for elsewhere and would otherwise generate an NPE 
deep down in {{OrcRecordUpdater}}.
* There are now two private transaction batch implementations: 
{{HiveEndPoint.TransactionBatchImpl}} and its insert/update/delete counterpart: 
{{HiveEndPoint.MutationTransactionBatchImpl}}. As you might expect, 
{{TransactionBatchImpl}} must delegate to a {{RecordWriter}} implementation 
whereas {{MutationTransactionBatchImpl}} must delegates to a {{RecordMutator}} 
implementation. Shared transaction batch functionality has been factored out 
into an {{AbstractTransactionBatch}} class. In the case of 
{{MutationTransactionBatchImpl}} we've added a check to ensure that an error 
occurs should a user submit multiple types of operation to the same batch as 
we've found that this can lead to inconsistent data being returned from the 
underlying table when read from Hive.
* To enable the usage of the different transaction batch variants we've added 
an additional transaction batch factory method to {{StreamingConnection}} and 
provided a suitable implementation in {{HiveEndPoint}}. It's worth noting that 
{{StreamingConnection}} is the only public facing component of the API contract 
that contains references to both the existing writer scheme and our mutator 
scheme.

Please find this changes in the attached patch: [^HIVE-10165.0.patch].


> Improve hive-hcatalog-streaming extensibility and support updates and deletes.
> ------------------------------------------------------------------------------
>
>                 Key: HIVE-10165
>                 URL: https://issues.apache.org/jira/browse/HIVE-10165
>             Project: Hive
>          Issue Type: Improvement
>          Components: HCatalog
>    Affects Versions: 1.2.0
>            Reporter: Elliot West
>            Assignee: Elliot West
>              Labels: streaming_api
>         Attachments: HIVE-10165.0.patch, HIVE-10165.4.patch, 
> HIVE-10165.5.patch, HIVE-10165.6.patch, mutate-system-overview.png
>
>
> h3. Overview
> I'd like to extend the 
> [hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
>  API so that it also supports the writing of record updates and deletes in 
> addition to the already supported inserts.
> h3. Motivation
> We have many Hadoop processes outside of Hive that merge changed facts into 
> existing datasets. Traditionally we achieve this by: reading in a 
> ground-truth dataset and a modified dataset, grouping by a key, sorting by a 
> sequence and then applying a function to determine inserted, updated, and 
> deleted rows. However, in our current scheme we must rewrite all partitions 
> that may potentially contain changes. In practice the number of mutated 
> records is very small when compared with the records contained in a 
> partition. This approach results in a number of operational issues:
> * Excessive amount of write activity required for small data changes.
> * Downstream applications cannot robustly read these datasets while they are 
> being updated.
> * Due to scale of the updates (hundreds or partitions) the scope for 
> contention is high. 
> I believe we can address this problem by instead writing only the changed 
> records to a Hive transactional table. This should drastically reduce the 
> amount of data that we need to write and also provide a means for managing 
> concurrent access to the data. Our existing merge processes can read and 
> retain each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to 
> an updated form of the hive-hcatalog-streaming API which will then have the 
> required data to perform an update or insert in a transactional manner. 
> h3. Benefits
> * Enables the creation of large-scale dataset merge processes  
> * Opens up Hive transactional functionality in an accessible manner to 
> processes that operate outside of Hive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to