[
https://issues.apache.org/jira/browse/PHOENIX-900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325201#comment-14325201
]
ASF GitHub Bot commented on PHOENIX-900:
----------------------------------------
Github user elilevine commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/37#discussion_r24869755
--- Diff:
phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static
org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialCommitIT {
+
+ private static final String TABLE_NAME_TO_FAIL =
"b_failure_table".toUpperCase();
+ private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
+ private static final String UPSERT_TO_FAIL = "upsert into " +
TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
+ private static final String DELETE_TO_FAIL = "delete from " +
TABLE_NAME_TO_FAIL + " where k='z'";
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private static String url;
+ private static Driver driver;
+ private static final Properties props = new Properties();
+
+ static {
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ setUpConfigForMiniCluster(conf);
+ conf.setClass("hbase.coprocessor.region.classes",
FailingRegionObserver.class, RegionObserver.class);
+ conf.setBoolean("hbase.coprocessor.abortonerror", false);
+ conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+ TEST_UTIL.startMiniCluster();
+ String clientPort =
TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
+ url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST +
JDBC_PROTOCOL_SEPARATOR + clientPort
+ + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // Must update config before starting server
+ props.put(QueryServices.DROP_METADATA_ATTRIB,
Boolean.toString(true));
+ driver = initAndRegisterDriver(url, new
ReadOnlyProps(props.entrySet().iterator()));
+ createTablesWithABitOfData();
+ }
+
+ private static void createTablesWithABitOfData() throws Exception {
+ Properties props = new Properties();
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+
+ try (Connection con = driver.connect(url, new Properties())) {
+ Statement sta = con.createStatement();
+ sta.execute("create table a_success_table (k varchar primary
key, c varchar)");
+ sta.execute("create table b_failure_table (k varchar primary
key, c varchar)");
+ sta.execute("create table c_success_table (k varchar primary
key, c varchar)");
+ con.commit();
+ }
+
+ props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
+
+ try (Connection con = driver.connect(url, new Properties())) {
+ con.setAutoCommit(false);
+ Statement sta = con.createStatement();
+ for (String table : newHashSet("a_success_table",
TABLE_NAME_TO_FAIL, "c_success_table")) {
+ sta.execute("upsert into " + table + " values ('z', 'z')");
+ sta.execute("upsert into " + table + " values ('zz',
'zz')");
+ sta.execute("upsert into " + table + " values ('zzz',
'zzz')");
--- End diff --
Since callers supply bind variables, they can keep track of them if they
are interested in handling CommitException.getUncommittedStatementIndexes()
correctly. e.g.:
```
try {
PreparedStatement ps = con.prepareStatement("upsert into t values
(?,?)");
ps.setString(1, "a");
ps.setString(2, "a");
ps.executeUpdate(); // statement index 0
ps.setString(1, "b");
ps.setString(2, "b");
ps.executeUpdate(); // statement index 1
con.commit();
} catch (CommitException ce) {
Set<Integer> uncommitedStatementIndexes =
ce.getUncommittedStatementIndexes();
// do something with uncommitedStatementIndexes
}
```
> Partial results for mutations
> -----------------------------
>
> Key: PHOENIX-900
> URL: https://issues.apache.org/jira/browse/PHOENIX-900
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 3.0.0, 4.0.0
> Reporter: Eli Levine
> Assignee: Eli Levine
> Attachments: PHOENIX-900.patch
>
>
> HBase provides a way to retrieve partial results of a batch operation:
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html#batch%28java.util.List,%20java.lang.Object[]%29
> Chatted with James about this offline:
> Yes, this could be included in the CommitException we throw
> (MutationState:412). We already include the batches that have been
> successfully committed to the HBase server in this exception. Would you be up
> for adding this additional information? You'd want to surface this in a
> Phoenix-y way in a method on CommitException, something like this: ResultSet
> getPartialCommits(). You can easily create an in memory ResultSet using
> MaterializedResultIterator plus the PhoenixResultSet constructor that accepts
> this (just create a new empty PhoenixStatement with the PhoenixConnection for
> the other arg).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)