Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4470#discussion_r131648638
  
    --- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.networking;
    +
    +import org.jboss.netty.bootstrap.ClientBootstrap;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelFuture;
    +import org.jboss.netty.channel.ChannelFutureListener;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.ChannelStateEvent;
    +import org.jboss.netty.channel.ExceptionEvent;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
    +
    +import java.net.InetSocketAddress;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.function.Consumer;
    +
    +/**
    + * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
    + * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
    + * flag.
    + */
    +class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
    +   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
    +
    +   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
    +   private final Map<Channel, Channel> sourceToTargetChannels = new 
ConcurrentHashMap<>();
    +   private final Consumer<NetworkFailureHandler> onClose;
    +   private final ClientSocketChannelFactory channelFactory;
    +   private final String remoteHost;
    +   private final int remotePort;
    +
    +   private final AtomicBoolean blocked;
    +
    +   public NetworkFailureHandler(
    +                   AtomicBoolean blocked,
    +                   Consumer<NetworkFailureHandler> onClose,
    +                   ClientSocketChannelFactory channelFactory,
    +                   String remoteHost,
    +                   int remotePort) {
    +           this.blocked = blocked;
    +           this.onClose = onClose;
    +           this.channelFactory = channelFactory;
    +           this.remoteHost = remoteHost;
    +           this.remotePort = remotePort;
    +   }
    +
    +   /**
    +    * Closes the specified channel after all queued write requests are 
flushed.
    +    */
    +   static void closeOnFlush(Channel channel) {
    +           if (channel.isConnected()) {
    +                   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    +           }
    +   }
    +
    +   public void closeConnections() {
    +           for (Map.Entry<Channel, Channel> entry : 
sourceToTargetChannels.entrySet()) {
    +                   // target channel is closed on source's channel 
channelClosed even
    +                   entry.getKey().close();
    +           }
    +   }
    +
    +   @Override
    +   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
    +           // Suspend incoming traffic until connected to the remote host.
    +           final Channel sourceChannel = event.getChannel();
    +           sourceChannel.setReadable(false);
    +
    +           if (blocked.get()) {
    +                   sourceChannel.close();
    +                   return;
    +           }
    +
    +           // Start the connection attempt.
    +           ClientBootstrap targetConnectionBootstrap = new 
ClientBootstrap(channelFactory);
    +           
targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, 
new TargetChannelHandler(event.getChannel(), blocked));
    +           ChannelFuture connectFuture = 
targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost, 
remotePort));
    +           sourceToTargetChannels.put(sourceChannel, 
connectFuture.getChannel());
    +
    +           connectFuture.addListener(future -> {
    +                   if (future.isSuccess()) {
    +                           // Connection attempt succeeded:
    +                           // Begin to accept incoming traffic.
    +                           sourceChannel.setReadable(true);
    +                   } else {
    +                           // Close the connection if the connection 
attempt has failed.
    +                           sourceChannel.close();
    +                   }
    +           });
    +   }
    +
    +   @Override
    +   public void messageReceived(ChannelHandlerContext context, MessageEvent 
event) throws Exception {
    +           if (blocked.get()) {
    +                   return;
    +           }
    +
    +           ChannelBuffer msg = (ChannelBuffer) event.getMessage();
    +           Channel targetChannel = 
sourceToTargetChannels.get(event.getChannel());
    +           if (targetChannel == null) {
    +                   throw new IllegalStateException();
    +           }
    +           targetChannel.write(msg);
    +   }
    +
    +   @Override
    +   public void channelClosed(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
    +           Channel targetChannel = 
sourceToTargetChannels.get(event.getChannel());
    +           if (targetChannel == null) {
    +                   return;
    +           }
    +           closeOnFlush(targetChannel);
    +           sourceToTargetChannels.remove(event.getChannel());
    +           onClose.accept(this);
    +   }
    +
    +   @Override
    +   public void exceptionCaught(ChannelHandlerContext context, 
ExceptionEvent event) throws Exception {
    +           event.getCause().printStackTrace();
    --- End diff --
    
    Ops, missed that when cleaning up a copied example code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to