ddanielr commented on code in PR #5375:
URL: https://github.com/apache/accumulo/pull/5375#discussion_r1995837567
##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -157,62 +160,150 @@ void start(Path bulkDir, Manager manager, long tid,
boolean setTime) throws Exce
private static class OnlineLoader extends Loader {
+ private final int maxConnections;
long timeInMillis;
String fmtTid;
int locationLess = 0;
- // track how many tablets were sent load messages per tablet server
- MapCounter<HostAndPort> loadMsgs;
+ int tabletsAdded;
// Each RPC to a tablet server needs to check in zookeeper to see if the
transaction is still
// active. The purpose of this map is to group load request by tablet
servers inorder to do less
// RPCs. Less RPCs will result in less calls to Zookeeper.
Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
private int queuedDataSize = 0;
+ public OnlineLoader(AccumuloConfiguration configuration) {
+ super();
+ this.maxConnections =
configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
+ }
+
@Override
void start(Path bulkDir, Manager manager, long tid, boolean setTime)
throws Exception {
super.start(bulkDir, manager, tid, setTime);
timeInMillis =
manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
fmtTid = FateTxId.formatTid(tid);
- loadMsgs = new MapCounter<>();
+ tabletsAdded = 0;
loadQueue = new HashMap<>();
}
+ private static class Client {
+ final HostAndPort server;
+ final TabletClientService.Client service;
+
+ private Client(HostAndPort server, TabletClientService.Client service) {
+ this.server = server;
+ this.service = service;
+ }
+ }
+
private void sendQueued(int threshhold) {
if (queuedDataSize > threshhold || threshhold == 0) {
- loadQueue.forEach((server, tabletFiles) -> {
+ var sendTimer = Timer.startNew();
+
+ List<Client> clients = new ArrayList<>();
+ try {
+
+ // Send load messages to tablet servers spinning up work, but do not
wait on results.
+ loadQueue.forEach((server, tabletFiles) -> {
+
+ if (log.isTraceEnabled()) {
+ log.trace("{} asking {} to bulk import {} files for {} tablets",
fmtTid, server,
+ tabletFiles.values().stream().mapToInt(Map::size).sum(),
tabletFiles.size());
+ }
+
+ // Tablet servers process tablets serially and perform a single
metadata table write for
+ // each tablet. Break the work into per-tablet chunks so it can be
sent over multiple
+ // connections to the tserver, allowing each chunk to be run in
parallel on the server
+ // side. This allows multiple threads on a single tserver to do
metadata writes for this
+ // bulk import.
+ int neededConnections = Math.min(maxConnections,
tabletFiles.size());
+ List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
+ new ArrayList<>(neededConnections);
+ for (int i = 0; i < neededConnections; i++) {
+ chunks.add(new HashMap<>());
+ }
+
+ int nextConnection = 0;
+ for (var entry : tabletFiles.entrySet()) {
+ chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(),
entry.getValue());
+ }
+
+ for (var chunk : chunks) {
+ try {
+ var client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
+ manager.getContext(), timeInMillis);
+ // add client to list before calling send in case there is an
exception, this makes
+ // sure its returned in the finally
+ clients.add(new Client(server, client));
+ client.send_loadFilesV2(TraceUtil.traceInfo(),
manager.getContext().rpcCreds(), tid,
+ bulkDir.toString(), chunk, setTime);
+ } catch (TException ex) {
+ log.debug("rpc send failed server: {}, {}", server, fmtTid,
ex);
+ }
+ }
+ });
+
+ long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
+ sendTimer.restart();
+
+ int outdatedTservers = 0;
+
+ // wait for all the tservers to complete processing
+ for (var client : clients) {
+ try {
+ client.service.recv_loadFilesV2();
+ } catch (TException ex) {
+ String additionalInfo = "";
+ if (ex instanceof TApplicationException &&
((TApplicationException) ex).getType()
+ == TApplicationException.UNKNOWN_METHOD) {
+ // A new RPC method was added in 2.1.4, a tserver running
2.1.3 or earlier will
+ // not have this RPC. This should not kill the fate operation,
it can wait until
+ // all tablet servers are upgraded.
+ outdatedTservers++;
+ additionalInfo = " (tserver may be running older version)";
+ }
+ log.debug("rpc recv failed server{}: {}, {}", additionalInfo,
client.server, fmtTid,
+ ex);
+ }
+ }
- if (log.isTraceEnabled()) {
- log.trace("{} asking {} to bulk import {} files for {} tablets",
fmtTid, server,
- tabletFiles.values().stream().mapToInt(Map::size).sum(),
tabletFiles.size());
+ if (outdatedTservers > 0) {
+ log.warn(
+ "{} can not proceed with bulk import because {} tablet servers
are likely running "
+ + "an older version. Please update tablet servers to same
patch level as manager.",
+ fmtTid, outdatedTservers);
}
- TabletClientService.Client client = null;
- try {
- client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER,
server,
- manager.getContext(), timeInMillis);
- client.loadFiles(TraceUtil.traceInfo(),
manager.getContext().rpcCreds(), tid,
- bulkDir.toString(), tabletFiles, setTime);
- } catch (TException ex) {
- log.debug("rpc failed server: " + server + ", " + fmtTid + " " +
ex.getMessage(), ex);
- } finally {
- ThriftUtil.returnClient(client, manager.getContext());
+ if (log.isDebugEnabled()) {
+ var recvTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
+ var tabletStats =
loadQueue.values().stream().mapToInt(Map::size).summaryStatistics();
+ log.debug(
+ "{} sent {} messages to {} tablet servers for {} tablets
(min:{} max:{} avg:{} " +
+ "tablets per tserver), send time:{}ms recv time:{}ms
{}:{}",
Review Comment:
Fix formatting
```suggestion
"{} sent {} messages to {} tablet servers for {} tablets
(min:{} max:{} avg:{} "
+ "tablets per tserver), send time:{}ms recv time:{}ms
{}:{}",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]