rhh777 commented on issue #154:
URL: 
https://github.com/apache/incubator-uniffle/issues/154#issuecomment-1211528644

   When I look at the RSS Client code, I see that no matter how many 
partitionNum are requested, an exception is thrown whenever the coordinator 
returns empty.
   
https://github.com/apache/incubator-uniffle/blob/master/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
   ```
   // transform [startPartition, endPartition] -> [server1, server2] to
     // {partition1 -> [server1, server2], partition2 - > [server1, server2]}
     @VisibleForTesting
     public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers(
         GetShuffleAssignmentsResponse response) {
       Map<Integer, List<ShuffleServerInfo>> partitionToServers = 
Maps.newHashMap();
       List<PartitionRangeAssignment> assigns = response.getAssignmentsList();
       for (PartitionRangeAssignment partitionRangeAssignment : assigns) {
         final int startPartition = 
partitionRangeAssignment.getStartPartition();
         final int endPartition = partitionRangeAssignment.getEndPartition();
         final List<ShuffleServerInfo> shuffleServerInfos = 
partitionRangeAssignment
             .getServerList()
             .stream()
             .map(ss -> new ShuffleServerInfo(ss.getId(), ss.getIp(), 
ss.getPort()))
             .collect(Collectors.toList());
         for (int i = startPartition; i <= endPartition; i++) {
           partitionToServers.put(i, shuffleServerInfos);
         }
       }
       if (partitionToServers.isEmpty()) {
         throw new RssException("Empty assignment to Shuffle Server");
       }
       return partitionToServers;
     }
   ```
   Is it possible to verify the partitionNum of the request and the 
partitionNum of the response, or other operations?
   ```
   // transform [startPartition, endPartition] -> [server1, server2] to
     // {partition1 -> [server1, server2], partition2 - > [server1, server2]}
     @VisibleForTesting
     public Map<Integer, List<ShuffleServerInfo>> 
getPartitionToServers(RssGetShuffleAssignmentsRequest request,
         GetShuffleAssignmentsResponse response) {
       Map<Integer, List<ShuffleServerInfo>> partitionToServers = 
Maps.newHashMap();
       List<PartitionRangeAssignment> assigns = response.getAssignmentsList();
       for (PartitionRangeAssignment partitionRangeAssignment : assigns) {
         final int startPartition = 
partitionRangeAssignment.getStartPartition();
         final int endPartition = partitionRangeAssignment.getEndPartition();
         final List<ShuffleServerInfo> shuffleServerInfos = 
partitionRangeAssignment
             .getServerList()
             .stream()
             .map(ss -> new ShuffleServerInfo(ss.getId(), ss.getIp(), 
ss.getPort()))
             .collect(Collectors.toList());
         for (int i = startPartition; i <= endPartition; i++) {
           partitionToServers.put(i, shuffleServerInfos);
         }
       }
   
       if (request.getPartitionNum() == 0 && partitionToServers.isEmpty()) {
           LOG.warn("dependency partitionNum is 0");
       }else if(partitionToServers.isEmpty()){
           throw new RssException("Empty assignment to Shuffle Server");
       }
       return partitionToServers;
     }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to