This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b4efab4194a583579dfe9597e7ab5b40c9759d8
Author: huangxingbo <h...@apache.org>
AuthorDate: Wed Nov 30 19:00:38 2022 +0800

    [FLINK-29155][python] Port Beam ServerFactory class to flink-python module
    
    This closes #21430.
---
 .../beam/runners/fnexecution/ServerFactory.java    | 296 +++++++++++++++++++++
 1 file changed, 296 insertions(+)

diff --git 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
new file mode 100644
index 00000000000..7a86c399a0f
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution;
+
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.BindableService;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ServerBuilder;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ServerInterceptors;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.netty.NettyServerBuilder;
+import 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.epoll.EpollEventLoopGroup;
+import 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.epoll.EpollServerSocketChannel;
+import 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.unix.DomainSocketAddress;
+import 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadLocalRandom;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/** A {@link Server gRPC server} factory. */
+public abstract class ServerFactory {
+
+    private static final int KEEP_ALIVE_TIME_SEC = 19;
+
+    /** Create a default {@link InetSocketAddressServerFactory}. */
+    public static ServerFactory createDefault() {
+        return new InetSocketAddressServerFactory(UrlFactory.createDefault());
+    }
+
+    /** Create a {@link InetSocketAddressServerFactory} that uses the given 
url factory. */
+    public static ServerFactory createWithUrlFactory(UrlFactory urlFactory) {
+        return new InetSocketAddressServerFactory(urlFactory);
+    }
+
+    /** Create a {@link InetSocketAddressServerFactory} that uses ports from a 
supplier. */
+    public static ServerFactory createWithPortSupplier(Supplier<Integer> 
portSupplier) {
+        return new InetSocketAddressServerFactory(UrlFactory.createDefault(), 
portSupplier);
+    }
+
+    /**
+     * Create a {@link InetSocketAddressServerFactory} that uses the given url 
factory and ports
+     * from a supplier.
+     */
+    public static ServerFactory createWithUrlFactoryAndPortSupplier(
+            UrlFactory urlFactory, Supplier<Integer> portSupplier) {
+        return new InetSocketAddressServerFactory(urlFactory, portSupplier);
+    }
+
+    /** Create a {@link EpollSocket}. */
+    public static ServerFactory createEpollSocket() {
+        return new EpollSocket();
+    }
+
+    /** Create a {@link EpollDomainSocket}. */
+    public static ServerFactory createEpollDomainSocket() {
+        return new EpollDomainSocket();
+    }
+
+    /**
+     * Creates an instance of this server using an ephemeral address. The 
allocation of the address
+     * is server type dependent, which means the address may be a port for 
certain type of server,
+     * or a file path for other certain types. The chosen address is 
accessible to the caller from
+     * the URL set in the input {@link 
Endpoints.ApiServiceDescriptor.Builder}. Server applies
+     * {@link GrpcContextHeaderAccessorProvider#interceptor()} to all incoming 
requests.
+     */
+    public abstract Server allocateAddressAndCreate(
+            List<BindableService> services, 
Endpoints.ApiServiceDescriptor.Builder builder)
+            throws IOException;
+
+    /**
+     * Creates an instance of this server at the address specified by the 
given service descriptor
+     * and bound to multiple services. Server applies {@link
+     * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming 
requests.
+     */
+    public abstract Server create(
+            List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
+            throws IOException;
+    /**
+     * Creates a {@link Server gRPC Server} using the default server factory.
+     *
+     * <p>The server is created listening any open port on "localhost".
+     */
+    public static class InetSocketAddressServerFactory extends ServerFactory {
+        private final UrlFactory urlFactory;
+        private final Supplier<Integer> portSupplier;
+
+        private InetSocketAddressServerFactory(UrlFactory urlFactory) {
+            this(urlFactory, () -> 0);
+        }
+
+        private InetSocketAddressServerFactory(
+                UrlFactory urlFactory, Supplier<Integer> portSupplier) {
+            this.urlFactory = urlFactory;
+            this.portSupplier = portSupplier;
+        }
+
+        @Override
+        public Server allocateAddressAndCreate(
+                List<BindableService> services,
+                Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+                throws IOException {
+            InetSocketAddress address =
+                    new InetSocketAddress(InetAddress.getLoopbackAddress(), 
portSupplier.get());
+            Server server = createServer(services, address);
+            apiServiceDescriptor.setUrl(
+                    urlFactory.createUrl(address.getHostName(), 
server.getPort()));
+            return server;
+        }
+
+        @Override
+        public Server create(
+                List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
+                throws IOException {
+            SocketAddress socketAddress =
+                    
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+            checkArgument(
+                    socketAddress instanceof InetSocketAddress,
+                    "%s %s requires a host:port socket address, got %s",
+                    getClass().getSimpleName(),
+                    ServerFactory.class.getSimpleName(),
+                    serviceDescriptor.getUrl());
+            return createServer(services, (InetSocketAddress) socketAddress);
+        }
+
+        private static Server createServer(List<BindableService> services, 
InetSocketAddress socket)
+                throws IOException {
+            NettyServerBuilder builder =
+                    NettyServerBuilder.forPort(socket.getPort())
+                            // Set the message size to max value here. The 
actual size is governed
+                            // by the
+                            // buffer size in the layers above.
+                            .maxMessageSize(Integer.MAX_VALUE)
+                            .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, 
TimeUnit.SECONDS);
+            services.stream()
+                    .forEach(
+                            service ->
+                                    builder.addService(
+                                            ServerInterceptors.intercept(
+                                                    service,
+                                                    
GrpcContextHeaderAccessorProvider
+                                                            .interceptor())));
+            return builder.build().start();
+        }
+    }
+
+    /**
+     * Creates a {@link Server gRPC Server} using a Unix domain socket. Note 
that this requires <a
+     * href="http://netty.io/wiki/forked-tomcat-native.html";>Netty 
TcNative</a> available to be able
+     * to provide a {@link EpollServerDomainSocketChannel}.
+     *
+     * <p>The unix domain socket is located at 
${java.io.tmpdir}/fnapi${random[0-10000)}.sock
+     */
+    private static class EpollDomainSocket extends ServerFactory {
+        private static File chooseRandomTmpFile(int port) {
+            return new File(
+                    System.getProperty("java.io.tmpdir"), 
String.format("fnapi%d.sock", port));
+        }
+
+        @Override
+        public Server allocateAddressAndCreate(
+                List<BindableService> services,
+                Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+                throws IOException {
+            File tmp;
+            do {
+                tmp = 
chooseRandomTmpFile(ThreadLocalRandom.current().nextInt(10000));
+            } while (tmp.exists());
+            apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
+            return create(services, apiServiceDescriptor.build());
+        }
+
+        @Override
+        public Server create(
+                List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
+                throws IOException {
+            SocketAddress socketAddress =
+                    
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+            checkArgument(
+                    socketAddress instanceof DomainSocketAddress,
+                    "%s requires a Unix domain socket address, got %s",
+                    EpollDomainSocket.class.getSimpleName(),
+                    serviceDescriptor.getUrl());
+            return createServer(services, (DomainSocketAddress) socketAddress);
+        }
+
+        private static Server createServer(
+                List<BindableService> services, DomainSocketAddress 
domainSocket)
+                throws IOException {
+            NettyServerBuilder builder =
+                    NettyServerBuilder.forAddress(domainSocket)
+                            .channelType(EpollServerDomainSocketChannel.class)
+                            .workerEventLoopGroup(new EpollEventLoopGroup())
+                            .bossEventLoopGroup(new EpollEventLoopGroup())
+                            .maxMessageSize(Integer.MAX_VALUE)
+                            .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, 
TimeUnit.SECONDS);
+            for (BindableService service : services) {
+                // Wrap the service to extract headers
+                builder.addService(
+                        ServerInterceptors.intercept(
+                                service, 
GrpcContextHeaderAccessorProvider.interceptor()));
+            }
+            return builder.build().start();
+        }
+    }
+
+    /**
+     * Creates a {@link Server gRPC Server} using an Epoll socket. Note that 
this requires <a
+     * href="http://netty.io/wiki/forked-tomcat-native.html";>Netty 
TcNative</a> available to be able
+     * to provide a {@link EpollServerSocketChannel}.
+     *
+     * <p>The server is created listening any open port on "localhost".
+     */
+    private static class EpollSocket extends ServerFactory {
+        @Override
+        public Server allocateAddressAndCreate(
+                List<BindableService> services,
+                Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+                throws IOException {
+            InetSocketAddress address = new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+            Server server = createServer(services, address);
+            apiServiceDescriptor.setUrl(
+                    HostAndPort.fromParts(address.getHostName(), 
server.getPort()).toString());
+            return server;
+        }
+
+        @Override
+        public Server create(
+                List<BindableService> services, Endpoints.ApiServiceDescriptor 
serviceDescriptor)
+                throws IOException {
+            SocketAddress socketAddress =
+                    
SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+            checkArgument(
+                    socketAddress instanceof InetSocketAddress,
+                    "%s requires a host:port socket address, got %s",
+                    EpollSocket.class.getSimpleName(),
+                    serviceDescriptor.getUrl());
+            return createServer(services, (InetSocketAddress) socketAddress);
+        }
+
+        private static Server createServer(List<BindableService> services, 
InetSocketAddress socket)
+                throws IOException {
+            ServerBuilder builder =
+                    NettyServerBuilder.forAddress(socket)
+                            .channelType(EpollServerSocketChannel.class)
+                            .workerEventLoopGroup(new EpollEventLoopGroup())
+                            .bossEventLoopGroup(new EpollEventLoopGroup())
+                            .maxMessageSize(Integer.MAX_VALUE)
+                            .permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, 
TimeUnit.SECONDS);
+            for (BindableService service : services) {
+                // Wrap the service to extract headers
+                builder.addService(
+                        ServerInterceptors.intercept(
+                                service, 
GrpcContextHeaderAccessorProvider.interceptor()));
+            }
+            return builder.build().start();
+        }
+    }
+
+    /**
+     * Factory that constructs client-accessible URLs from a local server 
address and port.
+     * Necessary when clients access server from a different networking 
context.
+     */
+    @FunctionalInterface
+    public interface UrlFactory {
+        String createUrl(String address, int port);
+
+        static UrlFactory createDefault() {
+            return (host, port) -> HostAndPort.fromParts(host, 
port).toString();
+        }
+    }
+}

Reply via email to