huwh commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160572180


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String 
clusterId) {
 
     @Override
     public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
-        final List<Pod> podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+        final List<Pod> podList =
+                this.internalClient
+                        .pods()
+                        .withLabels(labels)
+                        .list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   This "0" could be extracted as a static final variable



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =

Review Comment:
   maybe "pod" is enough, since there is only one pod here



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {

Review Comment:
   It's better to move the namespace, name, resource version to function 
arguments.
   



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -277,6 +281,23 @@ void testStopPod() throws ExecutionException, 
InterruptedException {
         
assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get()).isNull();
     }
 
+    @Test
+    void testGetPodsWithLabels() {
+        final String podName = "pod-with-labels";
+        final Pod pod =
+                new PodBuilder()
+                        .editOrNewMetadata()
+                        .withName(podName)
+                        .withLabels(TESTING_LABELS)
+                        .endMetadata()
+                        .editOrNewSpec()
+                        .endSpec()
+                        .build();
+        this.kubeClient.pods().inNamespace(NAMESPACE).create(pod);
+        List<KubernetesPod> kubernetesPods = 
this.flinkKubeClient.getPodsWithLabels(TESTING_LABELS);
+        assertThat(kubernetesPods.size()).isEqualTo(1);

Review Comment:
   Should check the list pod is excepted one
   
           assertThat(kubernetesPods)
                   .satisfiesExactly(
                           kubernetesPod -> 
assertThat(kubernetesPod.getName()).isEqualTo(podName));



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   It's better to check each event is received.
   
   Maybe you can use three CountDownLatch and a anonymous classes here 



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -630,4 +661,39 @@ private KubernetesConfigMap buildTestingConfigMap() {
                         .withData(data)
                         .build());
     }
+
+    private class TestingKubernetesPodCallbackHandler
+            implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
+
+        private final CountDownLatch eventLatch;
+
+        public TestingKubernetesPodCallbackHandler(CountDownLatch eventLatch) {
+            this.eventLatch = eventLatch;
+        }
+
+        @Override
+        public void onAdded(List<KubernetesPod> resources) {
+            this.eventLatch.countDown();
+        }
+
+        @Override
+        public void onModified(List<KubernetesPod> resources) {
+            this.eventLatch.countDown();
+        }
+
+        @Override
+        public void onDeleted(List<KubernetesPod> resources) {
+            this.eventLatch.countDown();
+        }
+
+        @Override
+        public void onError(List<KubernetesPod> resources) {

Review Comment:
   ERROR event will not trigger Watcher#eventReceived, so this unit test will 
failed.
   
   Maybe we can just skip this in unit test.
   
   ref: 
https://github.com/fabric8io/kubernetes-client/blob/master/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java#L325



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")

Review Comment:
   This path can be formatted using String.format.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
         server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
     }
 
+    protected void mockPodEventWithLabels(Map<String, String> labels) {
+        final Pod pod1 =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace("test")
+                        .withName("tm_pod1")
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        server.expect()
+                .withPath(
+                        
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(1000)

Review Comment:
   1000 is a bit long



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(TESTING_LABELS);
+        // the count latch for events.
+        final CountDownLatch eventLatch = new CountDownLatch(4);
+        this.flinkKubeClient.watchPodsAndDoCallback(
+                TESTING_LABELS, new 
TestingKubernetesPodCallbackHandler(eventLatch));
+        assertTrue(eventLatch.await(10, TimeUnit.SECONDS));

Review Comment:
   We need use junit5 and Assertj in unit tests. 
   
   assertThat(eventLatch.await(1000, TimeUnit.MILLISECONDS)).isTrue();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to