reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160799424


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String 
clusterId) {
 
     @Override
     public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
-        final List<Pod> podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+        final List<Pod> podList =
+                this.internalClient
+                        .pods()
+                        .withLabels(labels)
+                        .list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   ```suggestion
                           .list(new 
ListOptionsBuilder().withResourceVersion("0").build())
   ```
   We'd better add some comments for this magic number.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -233,6 +239,7 @@ public KubernetesWatch watchPodsAndDoCallback(
                                                         this.internalClient
                                                                 .pods()
                                                                 
.withLabels(labels)
+                                                                
.withResourceVersion("0")

Review Comment:
   Refer to previous comments.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   I'm not very familiar with this api. Can anyone tell me if this means we 
will definitely wait for `1` second or at most `1` second. If it is the former, 
it should definitely be prohibited because it will seriously slow down the 
execution time of `AZP`.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   It seems that we have too many class implements 
`FlinkKubeClient.WatchCallbackHandler<T>` only for testing purpose. We'd better 
introduce a more general and reusable `TestingKubernetesPodCallbackHandler` as 
the first commit and rewrite all other impls like `NoOpWatchCallbackHandler` & 
`TestingCallbackHandler`. 
   
   Don't worry too much, if you don't know how to do this refactor, I can push 
this commit as example.



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String 
clusterId) {
 
     @Override
     public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
-        final List<Pod> podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+        final List<Pod> podList =
+                this.internalClient
+                        .pods()
+                        .withLabels(labels)
+                        .list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   +1 for this.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   This time also should be extracted to a constant value.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -68,9 +70,11 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
+import static org.junit.Assert.assertTrue;
 
 /** Tests for Fabric implementation of {@link FlinkKubeClient}. */
 public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
+

Review Comment:
   Why add this new line?



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   I agreed that we should also check all received events. IMH, We can 
introduce three `CompletableFuture<Watch.Action>` to handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to