Eugene Koifman created HIVE-11906:
-------------------------------------

             Summary: IllegalStateException: Attempting to flush a 
RecordUpdater on....bucket_00000 with a single transaction.
                 Key: HIVE-11906
                 URL: https://issues.apache.org/jira/browse/HIVE-11906
             Project: Hive
          Issue Type: Bug
          Components: HCatalog, Transactions
    Affects Versions: 1.0.0
            Reporter: Eugene Koifman
            Assignee: Roshan Naik


{noformat}
java.lang.IllegalStateException: Attempting to flush a RecordUpdater on 
hdfs://127.0.0.1:9000/user/hive/warehouse/store_sales/dt=2015/delta_0003405_0003405/bucket_00000
 with a single transaction.
        at 
org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.flush(OrcRecordUpdater.java:341)
        at 
org.apache.hive.hcatalog.streaming.AbstractRecordWriter.flush(AbstractRecordWriter.java:124)
        at 
org.apache.hive.hcatalog.streaming.DelimitedInputWriter.flush(DelimitedInputWriter.java:49)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commitImpl(HiveEndPoint.java:723)
        at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:701)
        at org.apache.hive.acid.RueLaLaTest.test(RueLaLaTest.java:89)
{noformat}

{noformat}
package org.apache.hive.acid;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.junit.Test;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;

/**
 */
public class RueLaLaTest {
  static final private Log LOG = LogFactory.getLog(RueLaLaTest.class);
  @Test
  public void test() throws Exception {
    HiveConf.setHiveSiteLocation(new 
URL("file:///Users/ekoifman/dev/hwxhive/packaging/target/apache-hive-0.14.0-bin/apache-hive-0.14.0-bin/conf/hive-site.xml"));
    HiveConf hiveConf = new HiveConf(this.getClass());
    final String workerName = "test_0";
    SessionState.start(new SessionState(hiveConf));
    Driver d = new Driver(hiveConf);
    d.setMaxRows(200002);//make sure Driver returns all results
    runStatementOnDriver(d, "drop table if exists store_sales");
    runStatementOnDriver(d, "create table store_sales\n" +
      "(\n" +
      "    ss_sold_date_sk           int,\n" +
      "    ss_sold_time_sk           int,\n" +
      "    ss_item_sk                int,\n" +
      "    ss_customer_sk            int,\n" +
      "    ss_cdemo_sk               int,\n" +
      "    ss_hdemo_sk               int,\n" +
      "    ss_addr_sk                int,\n" +
      "    ss_store_sk               int,\n" +
      "    ss_promo_sk               int,\n" +
      "    ss_ticket_number          int,\n" +
      "    ss_quantity               int,\n" +
      "    ss_wholesale_cost         decimal(7,2),\n" +
      "    ss_list_price             decimal(7,2),\n" +
      "    ss_sales_price            decimal(7,2),\n" +
      "    ss_ext_discount_amt       decimal(7,2),\n" +
      "    ss_ext_sales_price        decimal(7,2),\n" +
      "    ss_ext_wholesale_cost     decimal(7,2),\n" +
      "    ss_ext_list_price         decimal(7,2),\n" +
      "    ss_ext_tax                decimal(7,2),\n" +
      "    ss_coupon_amt             decimal(7,2),\n" +
      "    ss_net_paid               decimal(7,2),\n" +
      "    ss_net_paid_inc_tax       decimal(7,2),\n" +
      "    ss_net_profit             decimal(7,2)\n" +
      ")\n" +
      " partitioned by (dt string)\n" +
      "clustered by (ss_store_sk, ss_promo_sk)\n" +
      "INTO 2 BUCKETS stored as orc TBLPROPERTIES ('orc.compress'='NONE', 
'transactional'='true')");

    runStatementOnDriver(d, "alter table store_sales add partition(dt='2015')");
    LOG.info(workerName + " starting...");
    List<String> partitionVals = new ArrayList<String>();
    partitionVals.add("2015");
    HiveEndPoint endPt = new HiveEndPoint(HiveConf.getVar(hiveConf, 
HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:9933"), "default", 
"store_sales", partitionVals);
    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", 
      "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", 
"ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", 
      "ss_wholesale_cost", "ss_list_price", "ss_sales_price", 
"ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", 
      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", 
"ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
    StreamingConnection connection = endPt.newConnection(false, null);//should 
this really be null?

    TransactionBatch txnBatch =  connection.fetchTransactionBatch(1, writer);
    LOG.info(workerName + " started txn batch");
    txnBatch.beginNextTransaction();
    LOG.info(workerName + " started commit txn " + 
JavaUtils.txnIdToString(txnBatch.getCurrentTxnId()));

    StringBuilder row = new StringBuilder();
    for(int i = 0; i < 1; i++) {
      for(int ints = 0; ints < 11; ints++) {
        row.append(ints).append(',');
      }
      for(int decs = 0; decs < 12; decs++) {
        row.append(i + 0.1).append(',');
      }
      row.setLength(row.length() - 1);
      txnBatch.write(row.toString().getBytes());
    }
    txnBatch.commit();
    txnBatch.close();
    connection.close();
  }
  private List<String> runStatementOnDriver(Driver d, String stmt) throws 
Exception {
    return AcidSystemTest.runStatementOnDriver(d, stmt);
  }
}
{noformat}

key part being that TransactionBatch has size 1.  > 1 works OK.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to