[ https://issues.apache.org/jira/browse/HBASE-13997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14609571#comment-14609571 ]
Zephyr Guo commented on HBASE-13997: ------------------------------------ Yes,I'm learning workflow. > ScannerCallableWithReplicas cause Infinitely blocking > ----------------------------------------------------- > > Key: HBASE-13997 > URL: https://issues.apache.org/jira/browse/HBASE-13997 > Project: HBase > Issue Type: Bug > Components: Client > Affects Versions: 1.0.1.1 > Reporter: Zephyr Guo > Assignee: Zephyr Guo > Priority: Minor > > Bug in ScannerCallableWithReplicas.addCallsForOtherReplicas method > {code:title=code in ScannerCallableWithReplicas.addCallsForOtherReplicas > |borderStyle=solid} > private int addCallsForOtherReplicas( > BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, > RegionLocations rl, int min, > int max) { > if (scan.getConsistency() == Consistency.STRONG) { > return 0; // not scheduling on other replicas for strong consistency > } > for (int id = min; id <= max; id++) { > if (currentScannerCallable.getHRegionInfo().getReplicaId() == id) { > continue; //this was already scheduled earlier > } > ScannerCallable s = > currentScannerCallable.getScannerCallableForReplica(id); > if (this.lastResult != null) { > s.getScan().setStartRow(this.lastResult.getRow()); > } > outstandingCallables.add(s); > RetryingRPC retryingOnReplica = new RetryingRPC(s); > cs.submit(retryingOnReplica); > } > return max - min + 1; //bug? should be "max - min",because "continue" > //always happen once > } > {code} > It can cause completed < submitted always so that the following code will be > infinitely blocked. > {code:title=code in ScannerCallableWithReplicas.call|borderStyle=solid} > // submitted larger than the actual one > submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); > try { > //here will be affected > while (completed < submitted) { > try { > Future<Pair<Result[], ScannerCallable>> f = cs.take(); > Pair<Result[], ScannerCallable> r = f.get(); > if (r != null && r.getSecond() != null) { > updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, > pool); > } > return r == null ? null : r.getFirst(); // great we got an answer > } catch (ExecutionException e) { > // if not cancel or interrupt, wait until all RPC's are done > // one of the tasks failed. Save the exception for later. > if (exceptions == null) exceptions = new > ArrayList<ExecutionException>(rl.size()); > exceptions.add(e); > completed++; > } > } > } catch (CancellationException e) { > throw new InterruptedIOException(e.getMessage()); > } catch (InterruptedException e) { > throw new InterruptedIOException(e.getMessage()); > } finally { > // We get there because we were interrupted or because one or more of > the > // calls succeeded or failed. In all case, we stop all our tasks. > cs.cancelAll(true); > } > {code} > If all replica-RS occur ExecutionException ,it will be infinitely blocked in > cs.take() -- This message was sent by Atlassian JIRA (v6.3.4#6332)