This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.16.x in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.16.x by this push: new 5428ea8 [java] KUDU-3349 Fix the failure to demote a leader 5428ea8 is described below commit 5428ea81612a253266e779c903ec49f98febbb07 Author: Hongjiang Zhang <hongjizh...@ebay.com> AuthorDate: Fri Jan 21 15:27:58 2022 +0800 [java] KUDU-3349 Fix the failure to demote a leader KuduScanToken gets a wrong tserver's uuid whose format is something like: '<ByteString@6dffd497 size=32 contents="fc07f681d3ea4bab9bc5ec8090ab9437">', the expected uuid should be "fc07f681d3ea4bab9bc5ec8090ab9437". This issue caused RemoteTablet to fail to demote a leader, and the java client always sends write ops to the demoted leader. As a result, there are a lot of "PendingErrors overflowed. Failed to write at least 1000 rows to Kudu". After this fix, the write ops, especially the deleting, will be faster. Change-Id: I2974b6ec2cec2f0120b113d1bcf89fe3793a1ec5 Reviewed-on: http://gerrit.cloudera.org:8080/18166 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <aser...@cloudera.com> (cherry picked from commit 90895ce76590f10730ad7aac3613b69d89ff5422) Reviewed-on: http://gerrit.cloudera.org:8080/18222 Tested-by: Alexey Serbin <aser...@cloudera.com> Reviewed-by: Attila Bukor <abu...@apache.org> --- .../java/org/apache/kudu/client/KuduScanToken.java | 64 ++++++---- .../java/org/apache/kudu/client/TestScanToken.java | 132 +++++++++++++++++++++ 2 files changed, 172 insertions(+), 24 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java index f864f98..e864089 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java @@ -205,6 +205,44 @@ public class KuduScanToken implements Comparable<KuduScanToken> { return columns; } + /** + * create a new RemoteTablet from TabletMetadata + * @param tabletMetadata the tablet metadata + * @param tableId the table Id + * @param partition the partition + * @return a RemoteTablet object + */ + public static RemoteTablet newRemoteTabletFromTabletMetadata( + Client.TabletMetadataPB tabletMetadata, + String tableId, + Partition partition) { + List<LocatedTablet.Replica> replicas = new ArrayList<>(); + for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB : + tabletMetadata.getReplicasList()) { + Client.ServerMetadataPB server = + tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx()); + LocatedTablet.Replica replica = new LocatedTablet.Replica( + server.getRpcAddresses(0).getHost(), + server.getRpcAddresses(0).getPort(), + replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel()); + replicas.add(replica); + } + + List<ServerInfo> servers = new ArrayList<>(); + for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) { + HostAndPort hostPort = + ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0)); + final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost()); + ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toStringUtf8(), + hostPort, inetAddress, serverMetadataPB.getLocation()); + servers.add(serverInfo); + } + + RemoteTablet remoteTablet = new RemoteTablet(tableId, + tabletMetadata.getTabletId(), partition, replicas, servers); + return remoteTablet; + } + @SuppressWarnings("deprecation") private static KuduScanner.KuduScannerBuilder pbIntoScannerBuilder( ScanTokenPB message, KuduClient client) throws KuduException { @@ -226,30 +264,8 @@ public class KuduScanToken implements Comparable<KuduScanToken> { TableLocationsCache tableLocationsCache = client.asyncClient.getOrCreateTableLocationsCache(table.getTableId()); - List<LocatedTablet.Replica> replicas = new ArrayList<>(); - for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB : - tabletMetadata.getReplicasList()) { - Client.ServerMetadataPB server = - tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx()); - LocatedTablet.Replica replica = new LocatedTablet.Replica( - server.getRpcAddresses(0).getHost(), - server.getRpcAddresses(0).getPort(), - replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel()); - replicas.add(replica); - } - - List<ServerInfo> servers = new ArrayList<>(); - for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) { - HostAndPort hostPort = - ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0)); - final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost()); - ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toString(), - hostPort, inetAddress, serverMetadataPB.getLocation()); - servers.add(serverInfo); - } - - RemoteTablet remoteTablet = new RemoteTablet(table.getTableId(), - tabletMetadata.getTabletId(), partition, replicas, servers); + RemoteTablet remoteTablet = + newRemoteTabletFromTabletMetadata(tabletMetadata, table.getTableId(), partition); tableLocationsCache.cacheTabletLocations(Collections.singletonList(remoteTablet), partition.partitionKeyStart, 1, tabletMetadata.getTtlMillis()); diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java index 87e3128..3ec8f17 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java @@ -32,6 +32,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -41,6 +42,8 @@ import java.util.Set; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.protobuf.CodedInputStream; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -51,6 +54,7 @@ import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.test.KuduTestHarness; +import org.apache.kudu.test.cluster.KuduBinaryLocator; import org.apache.kudu.test.cluster.MiniKuduCluster; public class TestScanToken { @@ -138,6 +142,104 @@ public class TestScanToken { } /** + * Regression test for KUDU-3349 + */ + @Test + public void testScanTokenWithWrongUuidSerialization() throws Exception { + // Prepare the table for testing. + Schema schema = createManyStringsSchema(); + CreateTableOptions createOptions = new CreateTableOptions(); + final int buckets = 8; + createOptions.addHashPartitions(ImmutableList.of("key"), buckets); + client.createTable(testTableName, schema, createOptions); + + KuduSession session = client.newSession(); + KuduTable table = client.openTable(testTableName); + final int totalRows = 100; + for (int i = 0; i < totalRows; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", String.format("key_%02d", i)); + row.addString("c1", "c1_" + i); + row.addString("c2", "c2_" + i); + assertEquals(session.apply(insert).hasRowError(), false); + } + KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table); + tokenBuilder.setProjectedColumnIndexes(ImmutableList.of()); + List<KuduScanToken> tokens = tokenBuilder.build(); + assertEquals(buckets, tokens.size()); + + // Create a new client, open the newly created kudu table, and new scanners. + AsyncKuduClient newAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder( + harness.getMasterAddressesAsString()) + .build(); + KuduClient newClient = newAsyncClient.syncClient(); + KuduTable newTable = newClient.openTable(testTableName); + List<KuduScanner> kuduScanners = new ArrayList<>(buckets); + List<String> tabletIds = new ArrayList<>(buckets); + for (KuduScanToken token : tokens) { + tabletIds.add(new String(token.getTablet().getTabletId(), + java.nio.charset.StandardCharsets.UTF_8)); + KuduScanner kuduScanner = token.intoScanner(newAsyncClient.syncClient()); + kuduScanners.add(kuduScanner); + } + + // Step down all tablet leaders. + KuduBinaryLocator.ExecutableInfo exeInfo = null; + try { + exeInfo = KuduBinaryLocator.findBinary("kudu"); + } catch (FileNotFoundException e) { + LOG.error(e.getMessage()); + fail(); + } + for (String tabletId : tabletIds) { + List<String> commandLine = Lists.newArrayList(exeInfo.exePath(), + "tablet", + "leader_step_down", + harness.getMasterAddressesAsString(), + tabletId); + ProcessBuilder processBuilder = new ProcessBuilder(commandLine); + processBuilder.environment().putAll(exeInfo.environment()); + // Step down the tablet leaders one by one after a fix duration. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.error(e.getMessage()); + } + } + // Delete all rows first through the new client. + KuduSession newSession = newClient.newSession(); + + for (int i = 0; i < totalRows; i++) { + Operation del = newTable.newDelete(); + PartialRow row = del.getRow(); + row.addString("key", String.format("key_%02d", i)); + del.setRow(row); + OperationResponse response = newSession.apply(del); + assertEquals(response.hasRowError(), false); + } + + // Insert all rows again through the new client. + for (int i = 0; i < totalRows; i++) { + Insert insert = newTable.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", String.format("key_%02d", i)); + row.addString("c1", "c1_" + i); + row.addString("c2", "c2_" + i); + assertEquals(newSession.apply(insert).hasRowError(), false); + } + + // Verify all the row count. + int rowCount = 0; + for (KuduScanner kuduScanner : kuduScanners) { + while (kuduScanner.hasMoreRows()) { + rowCount += kuduScanner.nextRows().numRows; + } + } + assertEquals(totalRows, rowCount); + } + + /** * Tests scan token creation and execution on a table with non-covering range partitions. */ @Test @@ -695,6 +797,36 @@ public class TestScanToken { } /** + * Verify the deserialization of RemoteTablet from KuduScanToken. + * Regression test for KUDU-3349. + */ + @Test + public void testRemoteTabletVerification() throws IOException { + final int NUM_ROWS_DESIRED = 100; + KuduTable table = createDefaultTable(client, testTableName); + loadDefaultTable(client, testTableName, NUM_ROWS_DESIRED); + KuduScanToken.KuduScanTokenBuilder builder = + new KuduScanToken.KuduScanTokenBuilder(asyncClient, table); + List<KuduScanToken> tokens = builder.build(); + List<HostAndPort> tservers = harness.getTabletServers(); + for (KuduScanToken token : tokens) { + byte[] serialized = token.serialize(); + Client.ScanTokenPB scanTokenPB = + Client.ScanTokenPB.parseFrom(CodedInputStream.newInstance(serialized)); + Client.TabletMetadataPB tabletMetadata = scanTokenPB.getTabletMetadata(); + Partition partition = + ProtobufHelper.pbToPartition(tabletMetadata.getPartition()); + RemoteTablet remoteTablet = KuduScanToken.newRemoteTabletFromTabletMetadata(tabletMetadata, + table.getTableId(), partition); + for (ServerInfo si : remoteTablet.getTabletServersCopy()) { + assertEquals(si.getUuid().length(), 32); + HostAndPort hostAndPort = si.getHostAndPort(); + assertEquals(tservers.contains(hostAndPort), true); + } + } + } + + /** * Regression test for KUDU-3205. */ @Test