[ 
https://issues.apache.org/jira/browse/HIVE-11906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403511#comment-15403511
 ] 

Vinuraj M edited comment on HIVE-11906 at 8/2/16 10:09 AM:
-----------------------------------------------------------

I am using Streaming ingest API to load files coming in at regular intervals 
from another system. The way I thought of implementing the file loading into 
Hive is to get one TransactionBatch instance and write the contents of one file 
using the single TransactionBatch instance obtained in single transaction. 
Basically trying to write one file contents in single transaction and commit it 
so that in case of an error I can always attempt to re-process the whole the 
file. 

Currently I am working around the API by getting more than one transaction 
batches but using only one of those.


was (Author: vmaroli):
I am using Streaming ingest API to load files coming in at regular intervals 
from another system. The way I thought of implementing the file loading into 
Hive is to get one TransactionBatch instance and write the contents of one file 
using the single TransactionBatch instance obtained in single transaction. 
Basically trying to write one file contents in single transaction and commit it 
so that in case of an error I can always attempt to re-process the whole the 
file. 

Because of this issue (HIVE-11906) I am forced to split the file contents load 
into multiple transactions and load. This is making the handling of error 
scenarios way too complicated than simply re-processing the whole file.

> IllegalStateException: Attempting to flush a RecordUpdater on....bucket_00000 
> with a single transaction.
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-11906
>                 URL: https://issues.apache.org/jira/browse/HIVE-11906
>             Project: Hive
>          Issue Type: Bug
>          Components: HCatalog, Transactions
>    Affects Versions: 1.0.0
>            Reporter: Eugene Koifman
>            Assignee: Varadharajan
>
> {noformat}
> java.lang.IllegalStateException: Attempting to flush a RecordUpdater on 
> hdfs://127.0.0.1:9000/user/hive/warehouse/store_sales/dt=2015/delta_0003405_0003405/bucket_00000
>  with a single transaction.
>       at 
> org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.flush(OrcRecordUpdater.java:341)
>       at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.flush(AbstractRecordWriter.java:124)
>       at 
> org.apache.hive.hcatalog.streaming.DelimitedInputWriter.flush(DelimitedInputWriter.java:49)
>       at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commitImpl(HiveEndPoint.java:723)
>       at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:701)
>       at org.apache.hive.acid.RueLaLaTest.test(RueLaLaTest.java:89)
> {noformat}
> {noformat}
> package org.apache.hive.acid;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.hive.common.JavaUtils;
> import org.apache.hadoop.hive.conf.HiveConf;
> import org.apache.hadoop.hive.ql.Driver;
> import org.apache.hadoop.hive.ql.session.SessionState;
> import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
> import org.apache.hive.hcatalog.streaming.HiveEndPoint;
> import org.apache.hive.hcatalog.streaming.StreamingConnection;
> import org.apache.hive.hcatalog.streaming.TransactionBatch;
> import org.junit.Test;
> import java.net.URL;
> import java.util.ArrayList;
> import java.util.List;
> /**
>  */
> public class RueLaLaTest {
>   static final private Log LOG = LogFactory.getLog(RueLaLaTest.class);
>   @Test
>   public void test() throws Exception {
>     HiveConf.setHiveSiteLocation(new 
> URL("file:///Users/ekoifman/dev/hwxhive/packaging/target/apache-hive-0.14.0-bin/apache-hive-0.14.0-bin/conf/hive-site.xml"));
>     HiveConf hiveConf = new HiveConf(this.getClass());
>     final String workerName = "test_0";
>     SessionState.start(new SessionState(hiveConf));
>     Driver d = new Driver(hiveConf);
>     d.setMaxRows(200002);//make sure Driver returns all results
>     runStatementOnDriver(d, "drop table if exists store_sales");
>     runStatementOnDriver(d, "create table store_sales\n" +
>       "(\n" +
>       "    ss_sold_date_sk           int,\n" +
>       "    ss_sold_time_sk           int,\n" +
>       "    ss_item_sk                int,\n" +
>       "    ss_customer_sk            int,\n" +
>       "    ss_cdemo_sk               int,\n" +
>       "    ss_hdemo_sk               int,\n" +
>       "    ss_addr_sk                int,\n" +
>       "    ss_store_sk               int,\n" +
>       "    ss_promo_sk               int,\n" +
>       "    ss_ticket_number          int,\n" +
>       "    ss_quantity               int,\n" +
>       "    ss_wholesale_cost         decimal(7,2),\n" +
>       "    ss_list_price             decimal(7,2),\n" +
>       "    ss_sales_price            decimal(7,2),\n" +
>       "    ss_ext_discount_amt       decimal(7,2),\n" +
>       "    ss_ext_sales_price        decimal(7,2),\n" +
>       "    ss_ext_wholesale_cost     decimal(7,2),\n" +
>       "    ss_ext_list_price         decimal(7,2),\n" +
>       "    ss_ext_tax                decimal(7,2),\n" +
>       "    ss_coupon_amt             decimal(7,2),\n" +
>       "    ss_net_paid               decimal(7,2),\n" +
>       "    ss_net_paid_inc_tax       decimal(7,2),\n" +
>       "    ss_net_profit             decimal(7,2)\n" +
>       ")\n" +
>       " partitioned by (dt string)\n" +
>       "clustered by (ss_store_sk, ss_promo_sk)\n" +
>       "INTO 2 BUCKETS stored as orc TBLPROPERTIES ('orc.compress'='NONE', 
> 'transactional'='true')");
>     runStatementOnDriver(d, "alter table store_sales add 
> partition(dt='2015')");
>     LOG.info(workerName + " starting...");
>     List<String> partitionVals = new ArrayList<String>();
>     partitionVals.add("2015");
>     HiveEndPoint endPt = new HiveEndPoint(HiveConf.getVar(hiveConf, 
> HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:9933"), "default", 
> "store_sales", partitionVals);
>     DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
> {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", 
>       "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", 
> "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", 
>       "ss_wholesale_cost", "ss_list_price", "ss_sales_price", 
> "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", 
>       "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", 
> "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
>     StreamingConnection connection = endPt.newConnection(false, 
> null);//should this really be null?
>     TransactionBatch txnBatch =  connection.fetchTransactionBatch(1, writer);
>     LOG.info(workerName + " started txn batch");
>     txnBatch.beginNextTransaction();
>     LOG.info(workerName + " started commit txn " + 
> JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()));
>     StringBuilder row = new StringBuilder();
>     for(int i = 0; i < 1; i++) {
>       for(int ints = 0; ints < 11; ints++) {
>         row.append(ints).append(',');
>       }
>       for(int decs = 0; decs < 12; decs++) {
>         row.append(i + 0.1).append(',');
>       }
>       row.setLength(row.length() - 1);
>       txnBatch.write(row.toString().getBytes());
>     }
>     txnBatch.commit();
>     txnBatch.close();
>     connection.close();
>   }
>   private List<String> runStatementOnDriver(Driver d, String stmt) throws 
> Exception {
>     return AcidSystemTest.runStatementOnDriver(d, stmt);
>   }
> }
> {noformat}
> key part being that TransactionBatch has size 1.  > 1 works OK.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to