[
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601869#comment-15601869
]
ASF GitHub Bot commented on BAHIR-72:
-------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/bahir-flink/pull/7#discussion_r84675183
--- Diff:
flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net._
+
+import org.apache.commons.lang3.SystemUtils
+import org.mortbay.util.MultiException
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Netty Utility class for start netty service and retry tcp port
+ */
+object NettyUtil {
+ private lazy val logger = LoggerFactory.getLogger(getClass)
+
+ /** find local inet addresses */
+ def findLocalInetAddress(): InetAddress = {
+
+ val address = InetAddress.getLocalHost
+ address.isLoopbackAddress match {
+ case true =>
+ // Address resolves to something like 127.0.1.1, which happens on
Debian; try to find
+ // a better address using the local network interfaces
+ // getNetworkInterfaces returns ifs in reverse order compared to
ifconfig output order
+ // on unix-like system. On windows, it returns in index order.
+ // It's more proper to pick ip address following system output
order.
+ val activeNetworkIFs =
NetworkInterface.getNetworkInterfaces.asScala.toSeq
+ val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
+ case true => activeNetworkIFs
+ case false => activeNetworkIFs.reverse
+ }
+
+ reOrderedNetworkIFs.find { ni: NetworkInterface =>
+ val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
+ addr.isLinkLocalAddress || addr.isLoopbackAddress
+ }
+ addr.nonEmpty
+ } match {
+ case Some(ni) =>
+ val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet
=>
+ inet.isLinkLocalAddress || inet.isLoopbackAddress
+ }
+ val address =
addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
+ // because of Inet6Address.toHostName may add interface at the
end if it knows about it
+ InetAddress.getByAddress(address)
+ case None => address
+ }
+ case false => address
+ }
+ }
+
+ /** start service, if port is collision, retry 128 times */
+ def startServiceOnPort[T](
+ startPort: Int,
+ startService: Int => T,
+ maxRetries: Int = 128,
+ serviceName: String = ""): T = {
+
+ if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
+ throw new Exception("startPort should be between 1024 and 65535
(inclusive), " +
+ "or 0 for a random free port.")
+ }
+
+ val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+ for (offset <- 0 to maxRetries) {
+ // Do not increment port if startPort is 0, which is treated as a
special port
+ val tryPort = if (startPort == 0) {
+ startPort
+ } else {
+ // If the new port wraps around, do not try a privilege port
+ ((startPort + offset - 1024) % (65536 - 1024)) + 1024
+ }
+
+ try {
+ val result = startService(tryPort)
+ logger.info(s"Successfully started service$serviceString,
result:$result.")
+ return result
+ } catch {
+ case e: Exception if isBindCollision(e) =>
+ if (offset >= maxRetries) {
+ val exceptionMessage = s"${e.getMessage}:
Service$serviceString failed after " +
+ s"$maxRetries retries! Consider explicitly setting the
appropriate port for the " +
+ s"service$serviceString (for example spark.ui.port for
SparkUI) to an available " +
+ "port or increasing spark.port.maxRetries."
--- End diff --
The exception message doesn't seem to be relevant to Flink.
In general, it seems that the code here seems to be copied from Apache
Spark:
https://github.com/apache/spark/blob/39755169fb5bb07332eef263b4c18ede1528812d/core/src/main/scala/org/apache/spark/util/Utils.scala#L2172
Can you add comments to the code copied from other projects?
> support netty: pushed tcp/http connector
> ----------------------------------------
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
> Issue Type: New Feature
> Components: Flink Streaming Connectors
> Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business
> system to flink worker directly.
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1. source run as a netty tcp and http server
> 2. user provide a tcp port, if the port is in used, increace the port
> number between 1024 to 65535. Source can parallel.
> 3. callback the provided url to report the real port to listen
> 4. user push streaming data to netty server, then collect the data to flink
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)