[GitHub] [flink] reswqa commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

2023-05-30 Thread via GitHub


reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1211033434


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java:
##
@@ -115,4 +115,9 @@ public class Constants {
 public static final String KUBERNETES_TASK_MANAGER_SCRIPT_PATH = 
"kubernetes-taskmanager.sh";
 
 public static final String ENV_TM_JVM_MEM_OPTS = "FLINK_TM_JVM_MEM_OPTS";
+
+// "resourceVersion="0" is means "Any".  Return data at any resource 
version.It saves time to

Review Comment:
   ```suggestion
   // "resourceVersion="0" means any resource version. It saves time to
   ```



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java:
##
@@ -65,7 +69,13 @@ void testClosingWithException() {
 void testCallbackHandler() {
 FlinkPod pod = new FlinkPod.Builder().build();
 final KubernetesPodsWatcher podsWatcher =
-new KubernetesPodsWatcher(new TestingCallbackHandler(e -> {}));
+new KubernetesPodsWatcher(
+TestingWatchCallbackHandler.builder()
+.setOnAddedConsumer(pods -> 
podAddedList.addAll(pods))
+.setOnModifiedConsumer(pods -> 
podModifiedList.addAll(pods))
+.setOnDeletedConsumer(pods -> 
podDeletedList.addAll(pods))
+.setOnErrorConsumer(pods -> 
podErrorList.addAll(pods))

Review Comment:
   It seems that these 4 list is only used in this test case. I'd suggestion 
moving their declaration to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #21527: [FLINK-27925] [kubernetes]Performance optimization when watch tm pod and list pod.

2023-04-07 Thread via GitHub


reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160799424


##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -191,7 +192,12 @@ public Optional getRestEndpoint(String 
clusterId) {
 
 @Override
 public List getPodsWithLabels(Map labels) {
-final List podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+final List podList =
+this.internalClient
+.pods()
+.withLabels(labels)
+.list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   ```suggestion
   .list(new 
ListOptionsBuilder().withResourceVersion("0").build())
   ```
   We'd better add some comments for this magic number.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -233,6 +239,7 @@ public KubernetesWatch watchPodsAndDoCallback(
 this.internalClient
 .pods()
 
.withLabels(labels)
+
.withResourceVersion("0")

Review Comment:
   Refer to previous comments.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
 server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
 }
 
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace("test")
+.withName("tm_pod1")
+.withLabels(labels)
+.withResourceVersion("5668")
+.endMetadata()
+.build();
+// mock four kinds of events.
+server.expect()
+.withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2=0=true=true")
+.andUpgradeToWebSocket()
+.open()
+.waitFor(1000)

Review Comment:
   I'm not very familiar with this api. Can anyone tell me if this means we 
will definitely wait for `1` second or at most `1` second. If it is the former, 
it should definitely be prohibited because it will seriously slow down the 
execution time of `AZP`.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
 .isEmpty();
 }
 
+@Test
+void testWatchPodsAndDoCallback() throws Exception {
+mockPodEventWithLabels(TESTING_LABELS);
+// the count latch for events.
+final CountDownLatch eventLatch = new CountDownLatch(4);
+this.flinkKubeClient.watchPodsAndDoCallback(

Review Comment:
   It seems that we have too many class implements 
`FlinkKubeClient.WatchCallbackHandler` only for testing purpose. We'd better 
introduce a more general and reusable `TestingKubernetesPodCallbackHandler` as 
the first commit and rewrite all other impls like `NoOpWatchCallbackHandler` & 
`TestingCallbackHandler`. 
   
   Don't worry too much, if you don't know how to do this refactor, I can push 
this commit as example.



##
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##
@@ -191,7 +192,12 @@ public Optional getRestEndpoint(String 
clusterId) {
 
 @Override
 public List getPodsWithLabels(Map labels) {
-final List podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+final List podList =
+this.internalClient
+.pods()
+.withLabels(labels)
+.list(new 
ListOptionsBuilder().withResourceVersion("0").build())

Review Comment:
   +1 for this.



##
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
 server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
 }
 
+protected void mockPodEventWithLabels(Map labels) {
+final Pod pod1 =
+new PodBuilder()
+.withNewMetadata()
+.withNamespace("test")
+.withName("tm_pod1")
+.withLabels(labels)
+.withResourceVersion("5668")
+