reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160799424
##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -191,7 +192,12 @@ public Optional getRestEndpoint(String
clusterId) {
@Override
public List getPodsWithLabels(Map labels) {
-final List podList =
this.internalClient.pods().withLabels(labels).list().getItems();
+final List podList =
+this.internalClient
+.pods()
+.withLabels(labels)
+.list(new
ListOptionsBuilder().withResourceVersion("0").build())
Review Comment:
```suggestion
.list(new
ListOptionsBuilder().withResourceVersion("0").build())
```
We'd better add some comments for this magic number.
##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -233,6 +239,7 @@ public KubernetesWatch watchPodsAndDoCallback(
this.internalClient
.pods()
.withLabels(labels)
+
.withResourceVersion("0")
Review Comment:
Refer to previous comments.
##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace("test")
+.withName("tm_pod1")
+.withLabels(labels)
+.withResourceVersion("5668")
+.endMetadata()
+.build();
+// mock four kinds of events.
+server.expect()
+.withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2=0=true=true")
+.andUpgradeToWebSocket()
+.open()
+.waitFor(1000)
Review Comment:
I'm not very familiar with this api. Can anyone tell me if this means we
will definitely wait for `1` second or at most `1` second. If it is the former,
it should definitely be prohibited because it will seriously slow down the
execution time of `AZP`.
##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
.isEmpty();
}
+@Test
+void testWatchPodsAndDoCallback() throws Exception {
+mockPodEventWithLabels(TESTING_LABELS);
+// the count latch for events.
+final CountDownLatch eventLatch = new CountDownLatch(4);
+this.flinkKubeClient.watchPodsAndDoCallback(
Review Comment:
It seems that we have too many class implements
`FlinkKubeClient.WatchCallbackHandler` only for testing purpose. We'd better
introduce a more general and reusable `TestingKubernetesPodCallbackHandler` as
the first commit and rewrite all other impls like `NoOpWatchCallbackHandler` &
`TestingCallbackHandler`.
Don't worry too much, if you don't know how to do this refactor, I can push
this commit as example.
##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -191,7 +192,12 @@ public Optional getRestEndpoint(String
clusterId) {
@Override
public List getPodsWithLabels(Map labels) {
-final List podList =
this.internalClient.pods().withLabels(labels).list().getItems();
+final List podList =
+this.internalClient
+.pods()
+.withLabels(labels)
+.list(new
ListOptionsBuilder().withResourceVersion("0").build())
Review Comment:
+1 for this.
##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace("test")
+.withName("tm_pod1")
+.withLabels(labels)
+.withResourceVersion("5668")
+