[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985760#comment-15985760
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

Github user ckadner commented on the issue:

https://github.com/apache/bahir-flink/pull/7
  
Sadly we did not have *Scalatest* enabled at the time this PR was reviewed, 
so we missed adding automated unit tests.

I opened [BAHIR-113: Flink Netty connector missing automated unit 
tests](https://issues.apache.org/jira/browse/BAHIR-113) to keep track of that.

@shijinkui -- would you be willing to take that on and open a PR for  
[BAHIR-113](https://issues.apache.org/jira/browse/BAHIR-113)?


> support netty: pushed tcp/http connector
> 
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
>  Issue Type: New Feature
>  Components: Flink Streaming Connectors
>Reporter: shijinkui
>Assignee: shijinkui
> Fix For: Flink-0.1
>
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp and http server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689726#comment-15689726
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

Github user asfgit closed the pull request at:

https://github.com/apache/bahir-flink/pull/7


> support netty: pushed tcp/http connector
> 
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
>  Issue Type: New Feature
>  Components: Flink Streaming Connectors
>Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp and http server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-11-23 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689725#comment-15689725
 ] 

ASF subversion and git services commented on BAHIR-72:
--

Commit 2d1225f9a3c5401c48267b5bcef49b19254ea3aa in bahir-flink's branch 
refs/heads/master from [~shijinkui]
[ https://git-wip-us.apache.org/repos/asf?p=bahir-flink.git;h=2d1225f ]

[BAHIR-72] support netty: pushed tcp/http connector
When source stream get start, listen a provided tcp port, receive stream data 
from user data source.
This netty tcp source is keepping alive and end-to-end, that is from business 
system to flink worker directly.
1.  source run as a netty tcp and http server
2.  user provide a tcp port, if the port is in used, increace the port 
number between 1024 to 65535. Source can parallel.
3.  callback the provided url to report the real port to listen
4.  user push streaming data to netty server, then collect the data to flink

This closes #7


> support netty: pushed tcp/http connector
> 
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
>  Issue Type: New Feature
>  Components: Flink Streaming Connectors
>Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp and http server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689713#comment-15689713
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

Github user rmetzger commented on the issue:

https://github.com/apache/bahir-flink/pull/7
  
Thanks a lot for the updates. I was traveling the last few weeks, that's 
why I didn't have time to review your changes earlier.

I'll merge the PR.


> support netty: pushed tcp/http connector
> 
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
>  Issue Type: New Feature
>  Components: Flink Streaming Connectors
>Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp and http server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688368#comment-15688368
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

Github user shijinkui commented on the issue:

https://github.com/apache/bahir-flink/pull/7
  
@rmetzger any problem?


> support netty: pushed tcp/http connector
> 
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
>  Issue Type: New Feature
>  Components: Flink Streaming Connectors
>Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp and http server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15603840#comment-15603840
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/bahir-flink/pull/7#discussion_r84816760
  
--- Diff: 
flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net._
+
+import org.apache.commons.lang3.SystemUtils
+import org.mortbay.util.MultiException
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Netty Utility class for start netty service and retry tcp port
+ */
+object NettyUtil {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  /** find local inet addresses */
+  def findLocalInetAddress(): InetAddress = {
+
+val address = InetAddress.getLocalHost
+address.isLoopbackAddress match {
+  case true =>
+// Address resolves to something like 127.0.1.1, which happens on 
Debian; try to find
+// a better address using the local network interfaces
+// getNetworkInterfaces returns ifs in reverse order compared to 
ifconfig output order
+// on unix-like system. On windows, it returns in index order.
+// It's more proper to pick ip address following system output 
order.
+val activeNetworkIFs = 
NetworkInterface.getNetworkInterfaces.asScala.toSeq
+val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
+  case true => activeNetworkIFs
+  case false => activeNetworkIFs.reverse
+}
+
+reOrderedNetworkIFs.find { ni: NetworkInterface =>
+  val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
+addr.isLinkLocalAddress || addr.isLoopbackAddress
+  }
+  addr.nonEmpty
+} match {
+  case Some(ni) =>
+val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet 
=>
+  inet.isLinkLocalAddress || inet.isLoopbackAddress
+}
+val address = 
addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
+// because of Inet6Address.toHostName may add interface at the 
end if it knows about it
+InetAddress.getByAddress(address)
+  case None => address
+}
+  case false => address
+}
+  }
+
+  /** start service, if port is collision, retry 128 times */
+  def startServiceOnPort[T](
+startPort: Int,
+startService: Int => T,
+maxRetries: Int = 128,
+serviceName: String = ""): T = {
+
+if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
+  throw new Exception("startPort should be between 1024 and 65535 
(inclusive), " +
+"or 0 for a random free port.")
+}
+
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+for (offset <- 0 to maxRetries) {
+  // Do not increment port if startPort is 0, which is treated as a 
special port
+  val tryPort = if (startPort == 0) {
+startPort
+  } else {
+// If the new port wraps around, do not try a privilege port
+((startPort + offset - 1024) % (65536 - 1024)) + 1024
+  }
+
+  try {
+val result = startService(tryPort)
+logger.info(s"Successfully started service$serviceString, 
result:$result.")
+return result
+  } catch {
+case e: Exception if isBindCollision(e) =>
+  if (offset >= maxRetries) {
+val except

[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601868#comment-15601868
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/bahir-flink/pull/7#discussion_r84674575
  
--- Diff: flink-connector-netty/pom.xml ---
@@ -0,0 +1,87 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  4.0.0
+
+  
+org.apache.bahir
+bahir-flink_parent_2.11
+1.0.0-SNAPSHOT
+..
+  
+
+  flink-connector-netty_2.11
+  flink-connector-netty
+  1.0.0-SNAPSHOT
+  jar
+
+  
+
+  io.netty
+  netty-all
+  4.1.5.Final
+
+
+  org.apache.flink
+  flink-table_${scala.binary.version}
+  ${flink.version}
+
--- End diff --

I don't think its a good idea to add the Table API here as a dependency 
just for one connector example.

Flink Table could grow quite big in the future, and users of the connector 
will not execute a single line of the table API.


> support netty: pushed tcp/http connector
> 
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
>  Issue Type: New Feature
>  Components: Flink Streaming Connectors
>Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp and http server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601869#comment-15601869
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/bahir-flink/pull/7#discussion_r84675183
  
--- Diff: 
flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net._
+
+import org.apache.commons.lang3.SystemUtils
+import org.mortbay.util.MultiException
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Netty Utility class for start netty service and retry tcp port
+ */
+object NettyUtil {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  /** find local inet addresses */
+  def findLocalInetAddress(): InetAddress = {
+
+val address = InetAddress.getLocalHost
+address.isLoopbackAddress match {
+  case true =>
+// Address resolves to something like 127.0.1.1, which happens on 
Debian; try to find
+// a better address using the local network interfaces
+// getNetworkInterfaces returns ifs in reverse order compared to 
ifconfig output order
+// on unix-like system. On windows, it returns in index order.
+// It's more proper to pick ip address following system output 
order.
+val activeNetworkIFs = 
NetworkInterface.getNetworkInterfaces.asScala.toSeq
+val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
+  case true => activeNetworkIFs
+  case false => activeNetworkIFs.reverse
+}
+
+reOrderedNetworkIFs.find { ni: NetworkInterface =>
+  val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
+addr.isLinkLocalAddress || addr.isLoopbackAddress
+  }
+  addr.nonEmpty
+} match {
+  case Some(ni) =>
+val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet 
=>
+  inet.isLinkLocalAddress || inet.isLoopbackAddress
+}
+val address = 
addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
+// because of Inet6Address.toHostName may add interface at the 
end if it knows about it
+InetAddress.getByAddress(address)
+  case None => address
+}
+  case false => address
+}
+  }
+
+  /** start service, if port is collision, retry 128 times */
+  def startServiceOnPort[T](
+startPort: Int,
+startService: Int => T,
+maxRetries: Int = 128,
+serviceName: String = ""): T = {
+
+if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
+  throw new Exception("startPort should be between 1024 and 65535 
(inclusive), " +
+"or 0 for a random free port.")
+}
+
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+for (offset <- 0 to maxRetries) {
+  // Do not increment port if startPort is 0, which is treated as a 
special port
+  val tryPort = if (startPort == 0) {
+startPort
+  } else {
+// If the new port wraps around, do not try a privilege port
+((startPort + offset - 1024) % (65536 - 1024)) + 1024
+  }
+
+  try {
+val result = startService(tryPort)
+logger.info(s"Successfully started service$serviceString, 
result:$result.")
+return result
+  } catch {
+case e: Exception if isBindCollision(e) =>
+  if (offset >= maxRetries) {
+val excepti

[jira] [Commented] (BAHIR-72) support netty: pushed tcp/http connector

2016-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BAHIR-72?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601725#comment-15601725
 ] 

ASF GitHub Bot commented on BAHIR-72:
-

GitHub user shijinkui opened a pull request:

https://github.com/apache/bahir-flink/pull/7

[BAHIR-72] support netty: pushed tcp/http connector

When source stream get start, listen a provided tcp port, receive stream 
data from user data source.
This netty tcp source is keepping alive and end-to-end, that is from 
business system to flink worker directly.
1.  source run as a netty tcp and http server
2.  user provide a tcp port, if the port is in used, increace the port 
number between 1024 to 65535. Source can parallel.
3.  callback the provided url to report the real port to listen
4.  user push streaming data to netty server, then collect the data to flink

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/bahir-flink netty_connector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/bahir-flink/pull/7.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7


commit a189597d11bfabb8af2fc671dd68cf8cc57d3fb2
Author: shijinkui 
Date:   2016-10-24T11:40:24Z

[BAHIR-72] support netty: pushed tcp/http connector
When source stream get start, listen a provided tcp port, receive stream 
data from user data source.
This netty tcp source is keepping alive and end-to-end, that is from 
business system to flink worker directly.
1.  source run as a netty tcp and http server
2.  user provide a tcp port, if the port is in used, increace the port 
number between 1024 to 65535. Source can parallel.
3.  callback the provided url to report the real port to listen
4.  user push streaming data to netty server, then collect the data to flink




> support netty: pushed tcp/http connector
> 
>
> Key: BAHIR-72
> URL: https://issues.apache.org/jira/browse/BAHIR-72
> Project: Bahir
>  Issue Type: New Feature
>  Components: Flink Streaming Connectors
>Reporter: shijinkui
>
> Also discuss from flink: https://issues.apache.org/jira/browse/FLINK-4630
> When source stream get start, listen a provided tcp port, receive stream data 
> from user data source.
> This netty tcp source is keepping alive and end-to-end, that is from business 
> system to flink worker directly. 
> Such source service is needed in produce indeed.
> describe the source in detail below:
> 1.source run as a netty tcp and http server
> 2.user provide a tcp port, if the port is in used, increace the port 
> number between 1024 to 65535. Source can parallel.
> 3.callback the provided url to report the real port to listen
> 4.user push streaming data to netty server, then collect the data to flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)