goiri commented on code in PR #5123: URL: https://github.com/apache/hadoop/pull/5123#discussion_r1030803567
########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java: ########## @@ -77,11 +83,45 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) { @Override public synchronized void receiveResponseState(RpcResponseHeaderProto header) { if (header.hasRouterFederatedState()) { - routerFederatedState = header.getRouterFederatedState(); + routerFederatedState = mergeRouterFederatedState(header.getRouterFederatedState()); } else { lastSeenStateId.accumulate(header.getStateId()); } } + /** + * Utility function to parse routerFederatedState field in RPC headers. + */ + public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) { + if (byteString == null) { + return Collections.emptyMap(); + } + + RouterFederatedStateProto federatedState; + try { + federatedState = RouterFederatedStateProto.parseFrom(byteString); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return federatedState.getNamespaceStateIdsMap(); Review Comment: Might be cleaner to do this inside of the try and move the declaration of the var too. Then you can return Collections.emptyMap(); at the end ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java: ########## @@ -83,8 +83,8 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) if (namespaceIdMap.isEmpty()) { return; } - HdfsServerFederationProtos.RouterFederatedStateProto.Builder federatedStateBuilder = - HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder(); + RouterFederatedStateProto.Builder federatedStateBuilder = Review Comment: Single line? ########## hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java: ########## @@ -77,11 +83,45 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) { @Override public synchronized void receiveResponseState(RpcResponseHeaderProto header) { if (header.hasRouterFederatedState()) { - routerFederatedState = header.getRouterFederatedState(); + routerFederatedState = mergeRouterFederatedState(header.getRouterFederatedState()); } else { lastSeenStateId.accumulate(header.getStateId()); } } + /** + * Utility function to parse routerFederatedState field in RPC headers. + */ + public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) { + if (byteString == null) { + return Collections.emptyMap(); + } + + RouterFederatedStateProto federatedState; + try { + federatedState = RouterFederatedStateProto.parseFrom(byteString); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return federatedState.getNamespaceStateIdsMap(); + } + + /** + * Merge the local FederatedState and RemoteFederateState to get the max value for each namespace. + * @param remoteState the remote RouterFederatedState. + * @return one ByteString object which contains the max value of each namespace. + */ + private ByteString mergeRouterFederatedState(ByteString remoteState) { Review Comment: Can this be static? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java: ########## @@ -439,4 +445,39 @@ public void testRouterMsync() throws Exception { assertEquals("Four calls should be sent to active", 4, rpcCountForActive); } + + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testClientReceiveResponseState() { + ClientGSIContext clientGSIContext = new ClientGSIContext(); + + Map<String, Long> mockMapping = new HashMap<>(); + mockMapping.put("ns0", 10L); + RouterFederatedStateProto.Builder federatedStateBuilder = Review Comment: Single line? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto: ########## @@ -311,17 +311,4 @@ message GetDisabledNameservicesRequestProto { message GetDisabledNameservicesResponseProto { repeated string nameServiceIds = 1; -} - -///////////////////////////////////////////////// -// Alignment state for namespaces. -///////////////////////////////////////////////// - -/** - * Clients should receive this message in RPC responses and forward it - * in RPC requests without interpreting it. It should be encoded - * as an obscure byte array when being sent to clients. - */ -message RouterFederatedStateProto { - map<string, int64> namespaceStateIds = 1; // Last seen state IDs for multiple namespaces. Review Comment: Can we change the protocol so freely? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org