[GitHub] [flink] rkhachatryan commented on a change in pull request #14528: [FLINK-20615] Clean PartitionRequestClientFactory up if createPartitionRequestClient fails

2020-12-30 Thread GitBox


rkhachatryan commented on a change in pull request #14528:
URL: https://github.com/apache/flink/pull/14528#discussion_r550292871



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##
@@ -69,36 +68,34 @@
 NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId)
 throws IOException, InterruptedException {
 while (true) {
-AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
-CompletableFuture clientFuture =
-clients.computeIfAbsent(
-connectionId,
-unused -> {
-isTheFirstOne.set(true);
-return new CompletableFuture<>();
-});
-if (isTheFirstOne.get()) {
+final CompletableFuture 
newClientFuture =
+new CompletableFuture<>();
+
+final CompletableFuture clientFuture =
+clients.putIfAbsent(connectionId, newClientFuture);
+
+final NettyPartitionRequestClient client;
+
+if (clientFuture == null) {
 try {
-clientFuture.complete(connectWithRetries(connectionId));
-} catch (InterruptedException e) {
-clientFuture.complete(null); // let others waiting know 
that they should retry
+client = connectWithRetries(connectionId);
+} catch (RemoteTransportException | InterruptedException e) {

Review comment:
   I think we should handle any exception here. Otherwise, other threads 
waiting for this future result will wait indefinitely (and new threads will 
receive a cached future).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14528: [FLINK-20615] Clean PartitionRequestClientFactory up if createPartitionRequestClient fails

2020-12-31 Thread GitBox


rkhachatryan commented on a change in pull request #14528:
URL: https://github.com/apache/flink/pull/14528#discussion_r550447001



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##
@@ -69,36 +68,34 @@
 NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId)
 throws IOException, InterruptedException {
 while (true) {
-AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
-CompletableFuture clientFuture =
-clients.computeIfAbsent(
-connectionId,
-unused -> {
-isTheFirstOne.set(true);
-return new CompletableFuture<>();
-});
-if (isTheFirstOne.get()) {
+final CompletableFuture 
newClientFuture =
+new CompletableFuture<>();
+
+final CompletableFuture clientFuture =
+clients.putIfAbsent(connectionId, newClientFuture);
+
+final NettyPartitionRequestClient client;
+
+if (clientFuture == null) {
 try {
-clientFuture.complete(connectWithRetries(connectionId));
-} catch (InterruptedException e) {
-clientFuture.complete(null); // let others waiting know 
that they should retry
+client = connectWithRetries(connectionId);
+} catch (RemoteTransportException | InterruptedException e) {

Review comment:
   I think it will fail the current job but not necessarily subsequent jobs 
using the same `ConnectionId`.
   In either case, I would not bake in this assumption, as it can change and 
makes more difficult to reason about.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14528: [FLINK-20615] Clean PartitionRequestClientFactory up if createPartitionRequestClient fails

2021-01-04 Thread GitBox


rkhachatryan commented on a change in pull request #14528:
URL: https://github.com/apache/flink/pull/14528#discussion_r551382429



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##
@@ -69,36 +68,34 @@
 NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId)
 throws IOException, InterruptedException {
 while (true) {
-AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
-CompletableFuture clientFuture =
-clients.computeIfAbsent(
-connectionId,
-unused -> {
-isTheFirstOne.set(true);
-return new CompletableFuture<>();
-});
-if (isTheFirstOne.get()) {
+final CompletableFuture 
newClientFuture =
+new CompletableFuture<>();
+
+final CompletableFuture clientFuture =
+clients.putIfAbsent(connectionId, newClientFuture);
+
+final NettyPartitionRequestClient client;
+
+if (clientFuture == null) {
 try {
-clientFuture.complete(connectWithRetries(connectionId));
-} catch (InterruptedException e) {
-clientFuture.complete(null); // let others waiting know 
that they should retry
+client = connectWithRetries(connectionId);
+} catch (RemoteTransportException | InterruptedException e) {

Review comment:
   Thanks!
   
   I just spotted another related issue: this exception is re-thrown below 
without being wrapped (at line 86).
   
   Previously, it was fine because it was only `InterruptedException` (other 
exceptions were wrapped after `future.get`).
   Not wrapping it into `IOException` can break the logic in 
`RestartPipelinedRegionFailoverStrategy.getTasksNeedingRestart`.
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14528: [FLINK-20615] Clean PartitionRequestClientFactory up if createPartitionRequestClient fails

2021-01-04 Thread GitBox


rkhachatryan commented on a change in pull request #14528:
URL: https://github.com/apache/flink/pull/14528#discussion_r551399086



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##
@@ -69,36 +68,34 @@
 NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId)
 throws IOException, InterruptedException {
 while (true) {
-AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
-CompletableFuture clientFuture =
-clients.computeIfAbsent(
-connectionId,
-unused -> {
-isTheFirstOne.set(true);
-return new CompletableFuture<>();
-});
-if (isTheFirstOne.get()) {
+final CompletableFuture 
newClientFuture =
+new CompletableFuture<>();
+
+final CompletableFuture clientFuture =
+clients.putIfAbsent(connectionId, newClientFuture);
+
+final NettyPartitionRequestClient client;
+
+if (clientFuture == null) {
 try {
-clientFuture.complete(connectWithRetries(connectionId));
-} catch (InterruptedException e) {
-clientFuture.complete(null); // let others waiting know 
that they should retry
+client = connectWithRetries(connectionId);
+} catch (RemoteTransportException | InterruptedException e) {

Review comment:
   I think you're right. Though other clients (threads) waiting for this 
future do `rethrowIOException`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org