Eugene Koifman created HIVE-11906:
-------------------------------------
Summary: IllegalStateException: Attempting to flush a
RecordUpdater on....bucket_00000 with a single transaction.
Key: HIVE-11906
URL: https://issues.apache.org/jira/browse/HIVE-11906
Project: Hive
Issue Type: Bug
Components: HCatalog, Transactions
Affects Versions: 1.0.0
Reporter: Eugene Koifman
Assignee: Roshan Naik
{noformat}
java.lang.IllegalStateException: Attempting to flush a RecordUpdater on
hdfs://127.0.0.1:9000/user/hive/warehouse/store_sales/dt=2015/delta_0003405_0003405/bucket_00000
with a single transaction.
at
org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.flush(OrcRecordUpdater.java:341)
at
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.flush(AbstractRecordWriter.java:124)
at
org.apache.hive.hcatalog.streaming.DelimitedInputWriter.flush(DelimitedInputWriter.java:49)
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commitImpl(HiveEndPoint.java:723)
at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:701)
at org.apache.hive.acid.RueLaLaTest.test(RueLaLaTest.java:89)
{noformat}
{noformat}
package org.apache.hive.acid;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.junit.Test;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
*/
public class RueLaLaTest {
static final private Log LOG = LogFactory.getLog(RueLaLaTest.class);
@Test
public void test() throws Exception {
HiveConf.setHiveSiteLocation(new
URL("file:///Users/ekoifman/dev/hwxhive/packaging/target/apache-hive-0.14.0-bin/apache-hive-0.14.0-bin/conf/hive-site.xml"));
HiveConf hiveConf = new HiveConf(this.getClass());
final String workerName = "test_0";
SessionState.start(new SessionState(hiveConf));
Driver d = new Driver(hiveConf);
d.setMaxRows(200002);//make sure Driver returns all results
runStatementOnDriver(d, "drop table if exists store_sales");
runStatementOnDriver(d, "create table store_sales\n" +
"(\n" +
" ss_sold_date_sk int,\n" +
" ss_sold_time_sk int,\n" +
" ss_item_sk int,\n" +
" ss_customer_sk int,\n" +
" ss_cdemo_sk int,\n" +
" ss_hdemo_sk int,\n" +
" ss_addr_sk int,\n" +
" ss_store_sk int,\n" +
" ss_promo_sk int,\n" +
" ss_ticket_number int,\n" +
" ss_quantity int,\n" +
" ss_wholesale_cost decimal(7,2),\n" +
" ss_list_price decimal(7,2),\n" +
" ss_sales_price decimal(7,2),\n" +
" ss_ext_discount_amt decimal(7,2),\n" +
" ss_ext_sales_price decimal(7,2),\n" +
" ss_ext_wholesale_cost decimal(7,2),\n" +
" ss_ext_list_price decimal(7,2),\n" +
" ss_ext_tax decimal(7,2),\n" +
" ss_coupon_amt decimal(7,2),\n" +
" ss_net_paid decimal(7,2),\n" +
" ss_net_paid_inc_tax decimal(7,2),\n" +
" ss_net_profit decimal(7,2)\n" +
")\n" +
" partitioned by (dt string)\n" +
"clustered by (ss_store_sk, ss_promo_sk)\n" +
"INTO 2 BUCKETS stored as orc TBLPROPERTIES ('orc.compress'='NONE',
'transactional'='true')");
runStatementOnDriver(d, "alter table store_sales add partition(dt='2015')");
LOG.info(workerName + " starting...");
List<String> partitionVals = new ArrayList<String>();
partitionVals.add("2015");
HiveEndPoint endPt = new HiveEndPoint(HiveConf.getVar(hiveConf,
HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:9933"), "default",
"store_sales", partitionVals);
DelimitedInputWriter writer = new DelimitedInputWriter(new String[]
{"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
"ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk",
"ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
"ss_wholesale_cost", "ss_list_price", "ss_sales_price",
"ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
"ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid",
"ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
StreamingConnection connection = endPt.newConnection(false, null);//should
this really be null?
TransactionBatch txnBatch = connection.fetchTransactionBatch(1, writer);
LOG.info(workerName + " started txn batch");
txnBatch.beginNextTransaction();
LOG.info(workerName + " started commit txn " +
JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()));
StringBuilder row = new StringBuilder();
for(int i = 0; i < 1; i++) {
for(int ints = 0; ints < 11; ints++) {
row.append(ints).append(',');
}
for(int decs = 0; decs < 12; decs++) {
row.append(i + 0.1).append(',');
}
row.setLength(row.length() - 1);
txnBatch.write(row.toString().getBytes());
}
txnBatch.commit();
txnBatch.close();
connection.close();
}
private List<String> runStatementOnDriver(Driver d, String stmt) throws
Exception {
return AcidSystemTest.runStatementOnDriver(d, stmt);
}
}
{noformat}
key part being that TransactionBatch has size 1. > 1 works OK.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)