[ https://issues.apache.org/jira/browse/HIVE-11906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15403511#comment-15403511 ]
Vinuraj M edited comment on HIVE-11906 at 8/2/16 10:09 AM: ----------------------------------------------------------- I am using Streaming ingest API to load files coming in at regular intervals from another system. The way I thought of implementing the file loading into Hive is to get one TransactionBatch instance and write the contents of one file using the single TransactionBatch instance obtained in single transaction. Basically trying to write one file contents in single transaction and commit it so that in case of an error I can always attempt to re-process the whole the file. Currently I am working around the API by getting more than one transaction batches but using only one of those. was (Author: vmaroli): I am using Streaming ingest API to load files coming in at regular intervals from another system. The way I thought of implementing the file loading into Hive is to get one TransactionBatch instance and write the contents of one file using the single TransactionBatch instance obtained in single transaction. Basically trying to write one file contents in single transaction and commit it so that in case of an error I can always attempt to re-process the whole the file. Because of this issue (HIVE-11906) I am forced to split the file contents load into multiple transactions and load. This is making the handling of error scenarios way too complicated than simply re-processing the whole file. > IllegalStateException: Attempting to flush a RecordUpdater on....bucket_00000 > with a single transaction. > -------------------------------------------------------------------------------------------------------- > > Key: HIVE-11906 > URL: https://issues.apache.org/jira/browse/HIVE-11906 > Project: Hive > Issue Type: Bug > Components: HCatalog, Transactions > Affects Versions: 1.0.0 > Reporter: Eugene Koifman > Assignee: Varadharajan > > {noformat} > java.lang.IllegalStateException: Attempting to flush a RecordUpdater on > hdfs://127.0.0.1:9000/user/hive/warehouse/store_sales/dt=2015/delta_0003405_0003405/bucket_00000 > with a single transaction. > at > org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.flush(OrcRecordUpdater.java:341) > at > org.apache.hive.hcatalog.streaming.AbstractRecordWriter.flush(AbstractRecordWriter.java:124) > at > org.apache.hive.hcatalog.streaming.DelimitedInputWriter.flush(DelimitedInputWriter.java:49) > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commitImpl(HiveEndPoint.java:723) > at > org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:701) > at org.apache.hive.acid.RueLaLaTest.test(RueLaLaTest.java:89) > {noformat} > {noformat} > package org.apache.hive.acid; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.hive.common.JavaUtils; > import org.apache.hadoop.hive.conf.HiveConf; > import org.apache.hadoop.hive.ql.Driver; > import org.apache.hadoop.hive.ql.session.SessionState; > import org.apache.hive.hcatalog.streaming.DelimitedInputWriter; > import org.apache.hive.hcatalog.streaming.HiveEndPoint; > import org.apache.hive.hcatalog.streaming.StreamingConnection; > import org.apache.hive.hcatalog.streaming.TransactionBatch; > import org.junit.Test; > import java.net.URL; > import java.util.ArrayList; > import java.util.List; > /** > */ > public class RueLaLaTest { > static final private Log LOG = LogFactory.getLog(RueLaLaTest.class); > @Test > public void test() throws Exception { > HiveConf.setHiveSiteLocation(new > URL("file:///Users/ekoifman/dev/hwxhive/packaging/target/apache-hive-0.14.0-bin/apache-hive-0.14.0-bin/conf/hive-site.xml")); > HiveConf hiveConf = new HiveConf(this.getClass()); > final String workerName = "test_0"; > SessionState.start(new SessionState(hiveConf)); > Driver d = new Driver(hiveConf); > d.setMaxRows(200002);//make sure Driver returns all results > runStatementOnDriver(d, "drop table if exists store_sales"); > runStatementOnDriver(d, "create table store_sales\n" + > "(\n" + > " ss_sold_date_sk int,\n" + > " ss_sold_time_sk int,\n" + > " ss_item_sk int,\n" + > " ss_customer_sk int,\n" + > " ss_cdemo_sk int,\n" + > " ss_hdemo_sk int,\n" + > " ss_addr_sk int,\n" + > " ss_store_sk int,\n" + > " ss_promo_sk int,\n" + > " ss_ticket_number int,\n" + > " ss_quantity int,\n" + > " ss_wholesale_cost decimal(7,2),\n" + > " ss_list_price decimal(7,2),\n" + > " ss_sales_price decimal(7,2),\n" + > " ss_ext_discount_amt decimal(7,2),\n" + > " ss_ext_sales_price decimal(7,2),\n" + > " ss_ext_wholesale_cost decimal(7,2),\n" + > " ss_ext_list_price decimal(7,2),\n" + > " ss_ext_tax decimal(7,2),\n" + > " ss_coupon_amt decimal(7,2),\n" + > " ss_net_paid decimal(7,2),\n" + > " ss_net_paid_inc_tax decimal(7,2),\n" + > " ss_net_profit decimal(7,2)\n" + > ")\n" + > " partitioned by (dt string)\n" + > "clustered by (ss_store_sk, ss_promo_sk)\n" + > "INTO 2 BUCKETS stored as orc TBLPROPERTIES ('orc.compress'='NONE', > 'transactional'='true')"); > runStatementOnDriver(d, "alter table store_sales add > partition(dt='2015')"); > LOG.info(workerName + " starting..."); > List<String> partitionVals = new ArrayList<String>(); > partitionVals.add("2015"); > HiveEndPoint endPt = new HiveEndPoint(HiveConf.getVar(hiveConf, > HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:9933"), "default", > "store_sales", partitionVals); > DelimitedInputWriter writer = new DelimitedInputWriter(new String[] > {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", > "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", > "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", > "ss_wholesale_cost", "ss_list_price", "ss_sales_price", > "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", > "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", > "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt); > StreamingConnection connection = endPt.newConnection(false, > null);//should this really be null? > TransactionBatch txnBatch = connection.fetchTransactionBatch(1, writer); > LOG.info(workerName + " started txn batch"); > txnBatch.beginNextTransaction(); > LOG.info(workerName + " started commit txn " + > JavaUtils.txnIdToString(txnBatch.getCurrentTxnId())); > StringBuilder row = new StringBuilder(); > for(int i = 0; i < 1; i++) { > for(int ints = 0; ints < 11; ints++) { > row.append(ints).append(','); > } > for(int decs = 0; decs < 12; decs++) { > row.append(i + 0.1).append(','); > } > row.setLength(row.length() - 1); > txnBatch.write(row.toString().getBytes()); > } > txnBatch.commit(); > txnBatch.close(); > connection.close(); > } > private List<String> runStatementOnDriver(Driver d, String stmt) throws > Exception { > return AcidSystemTest.runStatementOnDriver(d, stmt); > } > } > {noformat} > key part being that TransactionBatch has size 1. > 1 works OK. -- This message was sent by Atlassian JIRA (v6.3.4#6332)