huwh commented on code in PR #21527: URL: https://github.com/apache/flink/pull/21527#discussion_r1160572180
########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java: ########## @@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String clusterId) { @Override public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { - final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems(); + final List<Pod> podList = + this.internalClient + .pods() + .withLabels(labels) + .list(new ListOptionsBuilder().withResourceVersion("0").build()) Review Comment: This "0" could be extracted as a static final variable ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java: ########## @@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } + protected void mockPodEventWithLabels(Map<String, String> labels) { + final Pod pod1 = Review Comment: maybe "pod" is enough, since there is only one pod here ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java: ########## @@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } + protected void mockPodEventWithLabels(Map<String, String> labels) { Review Comment: It's better to move the namespace, name, resource version to function arguments. ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java: ########## @@ -277,6 +281,23 @@ void testStopPod() throws ExecutionException, InterruptedException { assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get()).isNull(); } + @Test + void testGetPodsWithLabels() { + final String podName = "pod-with-labels"; + final Pod pod = + new PodBuilder() + .editOrNewMetadata() + .withName(podName) + .withLabels(TESTING_LABELS) + .endMetadata() + .editOrNewSpec() + .endSpec() + .build(); + this.kubeClient.pods().inNamespace(NAMESPACE).create(pod); + List<KubernetesPod> kubernetesPods = this.flinkKubeClient.getPodsWithLabels(TESTING_LABELS); + assertThat(kubernetesPods.size()).isEqualTo(1); Review Comment: Should check the list pod is excepted one assertThat(kubernetesPods) .satisfiesExactly( kubernetesPod -> assertThat(kubernetesPod.getName()).isEqualTo(podName)); ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java: ########## @@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception { .isEmpty(); } + @Test + void testWatchPodsAndDoCallback() throws Exception { + mockPodEventWithLabels(TESTING_LABELS); + // the count latch for events. + final CountDownLatch eventLatch = new CountDownLatch(4); + this.flinkKubeClient.watchPodsAndDoCallback( Review Comment: It's better to check each event is received. Maybe you can use three CountDownLatch and a anonymous classes here ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java: ########## @@ -630,4 +661,39 @@ private KubernetesConfigMap buildTestingConfigMap() { .withData(data) .build()); } + + private class TestingKubernetesPodCallbackHandler + implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> { + + private final CountDownLatch eventLatch; + + public TestingKubernetesPodCallbackHandler(CountDownLatch eventLatch) { + this.eventLatch = eventLatch; + } + + @Override + public void onAdded(List<KubernetesPod> resources) { + this.eventLatch.countDown(); + } + + @Override + public void onModified(List<KubernetesPod> resources) { + this.eventLatch.countDown(); + } + + @Override + public void onDeleted(List<KubernetesPod> resources) { + this.eventLatch.countDown(); + } + + @Override + public void onError(List<KubernetesPod> resources) { Review Comment: ERROR event will not trigger Watcher#eventReceived, so this unit test will failed. Maybe we can just skip this in unit test. ref: https://github.com/fabric8io/kubernetes-client/blob/master/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java#L325 ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java: ########## @@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } + protected void mockPodEventWithLabels(Map<String, String> labels) { + final Pod pod1 = + new PodBuilder() + .withNewMetadata() + .withNamespace("test") + .withName("tm_pod1") + .withLabels(labels) + .withResourceVersion("5668") + .endMetadata() + .build(); + // mock four kinds of events. + server.expect() + .withPath( + "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true") Review Comment: This path can be formatted using String.format. ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java: ########## @@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } + protected void mockPodEventWithLabels(Map<String, String> labels) { + final Pod pod1 = + new PodBuilder() + .withNewMetadata() + .withNamespace("test") + .withName("tm_pod1") + .withLabels(labels) + .withResourceVersion("5668") + .endMetadata() + .build(); + // mock four kinds of events. + server.expect() + .withPath( + "/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(1000) Review Comment: 1000 is a bit long ########## flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java: ########## @@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception { .isEmpty(); } + @Test + void testWatchPodsAndDoCallback() throws Exception { + mockPodEventWithLabels(TESTING_LABELS); + // the count latch for events. + final CountDownLatch eventLatch = new CountDownLatch(4); + this.flinkKubeClient.watchPodsAndDoCallback( + TESTING_LABELS, new TestingKubernetesPodCallbackHandler(eventLatch)); + assertTrue(eventLatch.await(10, TimeUnit.SECONDS)); Review Comment: We need use junit5 and Assertj in unit tests. assertThat(eventLatch.await(1000, TimeUnit.MILLISECONDS)).isTrue(); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org