[ 
https://issues.apache.org/jira/browse/HIVE-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14554625#comment-14554625
 ] 

Elliot West commented on HIVE-10165:
------------------------------------

h3. Current status
I've had to change tack with this recently as I found that what I had built 
upon the existing API was not actually suited to the ETL merge use cases. 
Consider that the existing API is focused on the task of continuously writing 
small batches of new data and making that data available in Hive rapidly. 
Conversely, my use case is focused on infrequently writing large batches of 
changes that should only be available in Hive as a single batch or not at all. 
I've tried to summarise the differences:

h3. Use case comparison
||Attribute||Streaming case (current API)||Merge case (proposed API)||
|Ingest type|Data arrives continuously|Merges are performed periodically and 
the deltas are applied in a single batch|
|Transaction scope|Transactions are created for small batches of writes|The 
entire delta should be applied within a single transaction|
|Data availability|Surfaces new data to users frequently and quickly|Change 
sets should be applied atomically, either the effect of the delta is visible or 
it is not.|
|Sensitive to record order|No, records do not have pre-existing {{lastTxnIds}} 
or {{bucketIds}}. Records are likely being written into a single partition 
(today's date for example)|Yes, all mutated records have existing 
{{RecordIdentifiers}} and must be grouped by ({{partitionValues}}, 
{{bucketId}}) and sorted by {{lastTxnId}}. These record coordinates initially 
arrive in an effectively random order.|
|Impact of a write failure|Transaction can be aborted and producer can choose 
to resubmit failed records as ordering is not important.|Ingest for the 
respective must be halted and failed records resubmitted to preserve sequence.|
|User perception of missing data|Data has not arrived yet → "latency?"|"This 
data is inconsistent, some records have been updated, but other related records 
have not" - consider here the classic transfer between bank accounts scenario|
|API end point scope|A given {{HiveEndPoint}} instance submits many 
transactions to a specific bucket, in a specific partition, of a specific 
table|An API is required that writes changes to unknown set of buckets, of an 
unknown set of partitions, of a specific table (but perhaps more than one), 
within a single transaction. |

I think this table highlights two key points:
# A merge is not that useful if it cannot be atomic (i.e. the entire delta is 
applied in a single transaction).
# The current streaming API is based on the premise that {{partitionValues}} 
and {{bucketIds}} are known before ingestion and so the whole stack can be 
constructed with these as constants. Transactions are a small scale concern 
(small batches of writes) and therefore are not available to coordinate larger 
sets of operations across partitions and buckets.

h3. Proposal
In summary, I do not believe that the current API can or should be bent to 
handle the merge case as I think it is a different animal. Instead I propose an 
alternate API where the transaction is the highest-level construct. It presents 
two core collaborators: a client ({{MutationClient}}) that manages a long 
running transaction, and workers ({{MutationCoordinators}}) that coordinate 
updates within the transaction via managed {{OrcRecordUpdater}} instances. The 
mutation workload can be scaled horizontally by partitioning records by 
({{partitionValues}}, {{bucketId}}) across a number of workers:
{panel}
{code}
    // CLIENT/TOOL END
    //
    // Create a client to manage our transaction - singleton instance in the 
job client
    MutatorClient client = // a thing that knows how to get a transaction and 
manage a Hive lock

    // Get the transaction
    Transaction transaction = client.newTransaction();
    transaction.begin();

    // CLUSTER / WORKER END
    //
    // A job submitted to the cluster
    // The Jjob partitions the data by (partitionValues, ROW__ID.bucketId)
    // and orders the groups by (ROW__ID.lastTransactionId)

    // One of these sits at the output of each or the job's tasks
    MutatorCoordinator coordinator = // a thing that knows how to read 
bucketIds, write records, and create OrcRecordUpdaters
    
    coordinator.insert(partitionValues1, record1);
    coordinator.update(partitionValues2, record2);
    coordinator.delete(partitionValues3, record3);
    // millions of operations

    coordinator.close();

    // CLIENT/TOOL END
    //
    // The tasks have completed, control is back at the tool

    transaction.commit();

    client.close();
{code}
{panel}
h3. Relation to the current streaming API
I believe that there is some potential for reuse by factoring out common 
implementation code blocks into independent classes. I also believe this would 
improve the current API implementation by making it more amenable to 
lighter-weight unit testing. However, I do not think that the existing public 
facing API can be applied to the mutation case without some API breaking 
changes so I think a distinct mutation API is the way forward.

h3. Next steps
Given the large scale shift of focus, I'd like to gauge both the merit and 
appetite for moving forward with this proposal. Is it technically feasible?

I have a _draft_ API and implementation in a 
[branch|https://github.com/HotelsDotCom/hive/tree/mutate/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate]
 (I will update the patch shortly). If you believe this is worth pursuing then 
I'll being work on refactoring the implementation of the current API to prevent 
repetition within the implementations of the two APIs. I will also need to 
invest quite a bit of time for tests.

> Improve hive-hcatalog-streaming extensibility and support updates and deletes.
> ------------------------------------------------------------------------------
>
>                 Key: HIVE-10165
>                 URL: https://issues.apache.org/jira/browse/HIVE-10165
>             Project: Hive
>          Issue Type: Improvement
>          Components: HCatalog
>            Reporter: Elliot West
>            Assignee: Elliot West
>              Labels: streaming_api
>         Attachments: HIVE-10165.0.patch
>
>
> h3. Overview
> I'd like to extend the 
> [hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
>  API so that it also supports the writing of record updates and deletes in 
> addition to the already supported inserts.
> h3. Motivation
> We have many Hadoop processes outside of Hive that merge changed facts into 
> existing datasets. Traditionally we achieve this by: reading in a 
> ground-truth dataset and a modified dataset, grouping by a key, sorting by a 
> sequence and then applying a function to determine inserted, updated, and 
> deleted rows. However, in our current scheme we must rewrite all partitions 
> that may potentially contain changes. In practice the number of mutated 
> records is very small when compared with the records contained in a 
> partition. This approach results in a number of operational issues:
> * Excessive amount of write activity required for small data changes.
> * Downstream applications cannot robustly read these datasets while they are 
> being updated.
> * Due to scale of the updates (hundreds or partitions) the scope for 
> contention is high. 
> I believe we can address this problem by instead writing only the changed 
> records to a Hive transactional table. This should drastically reduce the 
> amount of data that we need to write and also provide a means for managing 
> concurrent access to the data. Our existing merge processes can read and 
> retain each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to 
> an updated form of the hive-hcatalog-streaming API which will then have the 
> required data to perform an update or insert in a transactional manner. 
> h3. Benefits
> * Enables the creation of large-scale dataset merge processes  
> * Opens up Hive transactional functionality in an accessible manner to 
> processes that operate outside of Hive.
> h3. Implementation
> Our changes do not break the existing API contracts. Instead our approach has 
> been to consider the functionality offered by the existing API and our 
> proposed API as fulfilling separate and distinct use-cases. The existing API 
> is primarily focused on the task of continuously writing large volumes of new 
> data into a Hive table for near-immediate analysis. Our use-case however, is 
> concerned more with the frequent but not continuous ingestion of mutations to 
> a Hive table from some ETL merge process. Consequently we feel it is 
> justifiable to add our new functionality via an alternative set of public 
> interfaces and leave the existing API as is. This keeps both APIs clean and 
> focused at the expense of presenting additional options to potential users. 
> Wherever possible, shared implementation concerns have been factored out into 
> abstract base classes that are open to third-party extension. A detailed 
> breakdown of the changes is as follows:
> * We've introduced a public {{RecordMutator}} interface whose purpose is to 
> expose insert/update/delete operations to the user. This is a counterpart to 
> the write-only {{RecordWriter}}. We've also factored out life-cycle methods 
> common to these two interfaces into a super {{RecordOperationWriter}} 
> interface.  Note that the row representation has be changed from {{byte[]}} 
> to {{Object}}. Within our data processing jobs our records are often 
> available in a strongly typed and decoded form such as a POJO or a Tuple 
> object. Therefore is seems to make sense that we are able to pass this 
> through to the {{OrcRecordUpdater}} without having to go through a {{byte[]}} 
> encoding step. This of course still allows users to use {{byte[]}} if they 
> wish.
> * The introduction of {{RecordMutator}} requires that insert/update/delete 
> operations are then also exposed on a {{TransactionBatch}} type. We've done 
> this with the introduction of a public {{MutatorTransactionBatch}} interface 
> which is a counterpart to the write-only {{TransactionBatch}}. We've also 
> factored out life-cycle methods common to these two interfaces into a super 
> {{BaseTransactionBatch}} interface. 
> * Functionality that would be shared by implementations of both 
> {{RecordWriters}} and {{RecordMutators}} has been factored out of 
> {{AbstractRecordWriter}} into a new abstract base class 
> {{AbstractOperationRecordWriter}}. The visibility is such that it is open to 
> extension by third parties. The {{AbstractOperationRecordWriter}} also 
> permits the setting of the {{AcidOutputFormat.Options#recordIdColumn()}} 
> (defaulted to {{-1}}) which is a requirement for enabling updates and 
> deletes. Additionally, these options are now fed an {{ObjectInspector}} via 
> an abstract method so that a {{SerDe}} is not mandated (it was not required 
> for our use-case). The {{AbstractRecordWriter}} is now much leaner, handling 
> only the extraction of the {{ObjectInspector}} from the {{SerDe}}.
> * A new abstract class, {{AbstractRecordMutator}} has been introduced to act 
> as the base of concrete {{RecordMutator}} implementations. The key 
> functionality added by this class is a validation step on {{update}} and 
> {{delete}} to ensure that the record specified contains a 
> {{RecordIdentifier}}. This was added as it is not explicitly checked for 
> elsewhere and would otherwise generate an NPE deep down in 
> {{OrcRecordUpdater}}.
> * There are now two private transaction batch implementations: 
> {{HiveEndPoint.TransactionBatchImpl}} and its insert/update/delete 
> counterpart: {{HiveEndPoint.MutationTransactionBatchImpl}}. As you might 
> expect, {{TransactionBatchImpl}} must delegate to a {{RecordWriter}} 
> implementation whereas {{MutationTransactionBatchImpl}} must delegates to a 
> {{RecordMutator}} implementation. Shared transaction batch functionality has 
> been factored out into an {{AbstractTransactionBatch}} class. In the case of 
> {{MutationTransactionBatchImpl}} we've added a check to ensure that an error 
> occurs should a user submit multiple types of operation to the same batch as 
> we've found that this can lead to inconsistent data being returned from the 
> underlying table when read from Hive.
> * To enable the usage of the different transaction batch variants we've added 
> an additional transaction batch factory method to {{StreamingConnection}} and 
> provided a suitable implementation in {{HiveEndPoint}}. It's worth noting 
> that {{StreamingConnection}} is the only public facing component of the API 
> contract that contains references to both the existing writer scheme and our 
> mutator scheme.
> Please find this changes in the attached patch: [^HIVE-10165.0.patch].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to