[ https://issues.apache.org/jira/browse/HIVE-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559879#comment-14559879 ]
Alan Gates commented on HIVE-10165: ----------------------------------- I'll review if someone else doesn't get to it first. It will take me a few days to get to it as I'm out the rest of this week. As far as the failing tests, the 5 earlier failures didn't look related to your patch. Unless we really broke the trunk it's surprising to see 600+ test failures for your later patch. Have you tried running some of these locally to see whether you can reproduce them? > Improve hive-hcatalog-streaming extensibility and support updates and deletes. > ------------------------------------------------------------------------------ > > Key: HIVE-10165 > URL: https://issues.apache.org/jira/browse/HIVE-10165 > Project: Hive > Issue Type: Improvement > Components: HCatalog > Affects Versions: 1.2.0 > Reporter: Elliot West > Assignee: Elliot West > Labels: streaming_api > Attachments: HIVE-10165.0.patch, HIVE-10165.4.patch, > HIVE-10165.5.patch > > > h3. Overview > I'd like to extend the > [hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest] > API so that it also supports the writing of record updates and deletes in > addition to the already supported inserts. > h3. Motivation > We have many Hadoop processes outside of Hive that merge changed facts into > existing datasets. Traditionally we achieve this by: reading in a > ground-truth dataset and a modified dataset, grouping by a key, sorting by a > sequence and then applying a function to determine inserted, updated, and > deleted rows. However, in our current scheme we must rewrite all partitions > that may potentially contain changes. In practice the number of mutated > records is very small when compared with the records contained in a > partition. This approach results in a number of operational issues: > * Excessive amount of write activity required for small data changes. > * Downstream applications cannot robustly read these datasets while they are > being updated. > * Due to scale of the updates (hundreds or partitions) the scope for > contention is high. > I believe we can address this problem by instead writing only the changed > records to a Hive transactional table. This should drastically reduce the > amount of data that we need to write and also provide a means for managing > concurrent access to the data. Our existing merge processes can read and > retain each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to > an updated form of the hive-hcatalog-streaming API which will then have the > required data to perform an update or insert in a transactional manner. > h3. Benefits > * Enables the creation of large-scale dataset merge processes > * Opens up Hive transactional functionality in an accessible manner to > processes that operate outside of Hive. > h3. Implementation > Our changes do not break the existing API contracts. Instead our approach has > been to consider the functionality offered by the existing API and our > proposed API as fulfilling separate and distinct use-cases. The existing API > is primarily focused on the task of continuously writing large volumes of new > data into a Hive table for near-immediate analysis. Our use-case however, is > concerned more with the frequent but not continuous ingestion of mutations to > a Hive table from some ETL merge process. Consequently we feel it is > justifiable to add our new functionality via an alternative set of public > interfaces and leave the existing API as is. This keeps both APIs clean and > focused at the expense of presenting additional options to potential users. > Wherever possible, shared implementation concerns have been factored out into > abstract base classes that are open to third-party extension. A detailed > breakdown of the changes is as follows: > * We've introduced a public {{RecordMutator}} interface whose purpose is to > expose insert/update/delete operations to the user. This is a counterpart to > the write-only {{RecordWriter}}. We've also factored out life-cycle methods > common to these two interfaces into a super {{RecordOperationWriter}} > interface. Note that the row representation has be changed from {{byte[]}} > to {{Object}}. Within our data processing jobs our records are often > available in a strongly typed and decoded form such as a POJO or a Tuple > object. Therefore is seems to make sense that we are able to pass this > through to the {{OrcRecordUpdater}} without having to go through a {{byte[]}} > encoding step. This of course still allows users to use {{byte[]}} if they > wish. > * The introduction of {{RecordMutator}} requires that insert/update/delete > operations are then also exposed on a {{TransactionBatch}} type. We've done > this with the introduction of a public {{MutatorTransactionBatch}} interface > which is a counterpart to the write-only {{TransactionBatch}}. We've also > factored out life-cycle methods common to these two interfaces into a super > {{BaseTransactionBatch}} interface. > * Functionality that would be shared by implementations of both > {{RecordWriters}} and {{RecordMutators}} has been factored out of > {{AbstractRecordWriter}} into a new abstract base class > {{AbstractOperationRecordWriter}}. The visibility is such that it is open to > extension by third parties. The {{AbstractOperationRecordWriter}} also > permits the setting of the {{AcidOutputFormat.Options#recordIdColumn()}} > (defaulted to {{-1}}) which is a requirement for enabling updates and > deletes. Additionally, these options are now fed an {{ObjectInspector}} via > an abstract method so that a {{SerDe}} is not mandated (it was not required > for our use-case). The {{AbstractRecordWriter}} is now much leaner, handling > only the extraction of the {{ObjectInspector}} from the {{SerDe}}. > * A new abstract class, {{AbstractRecordMutator}} has been introduced to act > as the base of concrete {{RecordMutator}} implementations. The key > functionality added by this class is a validation step on {{update}} and > {{delete}} to ensure that the record specified contains a > {{RecordIdentifier}}. This was added as it is not explicitly checked for > elsewhere and would otherwise generate an NPE deep down in > {{OrcRecordUpdater}}. > * There are now two private transaction batch implementations: > {{HiveEndPoint.TransactionBatchImpl}} and its insert/update/delete > counterpart: {{HiveEndPoint.MutationTransactionBatchImpl}}. As you might > expect, {{TransactionBatchImpl}} must delegate to a {{RecordWriter}} > implementation whereas {{MutationTransactionBatchImpl}} must delegates to a > {{RecordMutator}} implementation. Shared transaction batch functionality has > been factored out into an {{AbstractTransactionBatch}} class. In the case of > {{MutationTransactionBatchImpl}} we've added a check to ensure that an error > occurs should a user submit multiple types of operation to the same batch as > we've found that this can lead to inconsistent data being returned from the > underlying table when read from Hive. > * To enable the usage of the different transaction batch variants we've added > an additional transaction batch factory method to {{StreamingConnection}} and > provided a suitable implementation in {{HiveEndPoint}}. It's worth noting > that {{StreamingConnection}} is the only public facing component of the API > contract that contains references to both the existing writer scheme and our > mutator scheme. > Please find this changes in the attached patch: [^HIVE-10165.0.patch]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)