reswqa commented on code in PR #21527: URL: https://github.com/apache/flink/pull/21527#discussion_r1160799424
########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java: ########## @@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String clusterId) { @Override public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { - final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems(); + final List<Pod> podList = + this.internalClient + .pods() + .withLabels(labels) + .list(new ListOptionsBuilder().withResourceVersion("0").build()) Review Comment: ```suggestion .list(new ListOptionsBuilder().withResourceVersion("0").build()) ``` We'd better add some comments for this magic number. ########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java: ########## @@ -233,6 +239,7 @@ public KubernetesWatch watchPodsAndDoCallback( this.internalClient .pods() .withLabels(labels) + .withResourceVersion("0") Review Comment: Refer to previous comments. ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java: ########## @@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } + protected void mockPodEventWithLabels(Map<String, String> labels) { + final Pod pod1 = + new PodBuilder() + .withNewMetadata() + .withNamespace("test") + .withName("tm_pod1") + .withLabels(labels) + .withResourceVersion("5668") + .endMetadata() + .build(); + // mock four kinds of events. + server.expect() + .withPath( + "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(1000) Review Comment: I'm not very familiar with this api. Can anyone tell me if this means we will definitely wait for `1` second or at most `1` second. If it is the former, it should definitely be prohibited because it will seriously slow down the execution time of `AZP`. ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java: ########## @@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception { .isEmpty(); } + @Test + void testWatchPodsAndDoCallback() throws Exception { + mockPodEventWithLabels(TESTING_LABELS); + // the count latch for events. + final CountDownLatch eventLatch = new CountDownLatch(4); + this.flinkKubeClient.watchPodsAndDoCallback( Review Comment: It seems that we have too many class implements `FlinkKubeClient.WatchCallbackHandler<T>` only for testing purpose. We'd better introduce a more general and reusable `TestingKubernetesPodCallbackHandler` as the first commit and rewrite all other impls like `NoOpWatchCallbackHandler` & `TestingCallbackHandler`. Don't worry too much, if you don't know how to do this refactor, I can push this commit as example. ########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java: ########## @@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String clusterId) { @Override public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { - final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems(); + final List<Pod> podList = + this.internalClient + .pods() + .withLabels(labels) + .list(new ListOptionsBuilder().withResourceVersion("0").build()) Review Comment: +1 for this. ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java: ########## @@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } + protected void mockPodEventWithLabels(Map<String, String> labels) { + final Pod pod1 = + new PodBuilder() + .withNewMetadata() + .withNamespace("test") + .withName("tm_pod1") + .withLabels(labels) + .withResourceVersion("5668") + .endMetadata() + .build(); + // mock four kinds of events. + server.expect() + .withPath( + "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(1000) Review Comment: This time also should be extracted to a constant value. ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java: ########## @@ -68,9 +70,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertTrue; /** Tests for Fabric implementation of {@link FlinkKubeClient}. */ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase { + Review Comment: Why add this new line? ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java: ########## @@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception { .isEmpty(); } + @Test + void testWatchPodsAndDoCallback() throws Exception { + mockPodEventWithLabels(TESTING_LABELS); + // the count latch for events. + final CountDownLatch eventLatch = new CountDownLatch(4); + this.flinkKubeClient.watchPodsAndDoCallback( Review Comment: I agreed that we should also check all received events. IMH, We can introduce three `CompletableFuture<Watch.Action>` to handle this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org