[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6071


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194770093
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
 ---
@@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) 
throws Exception {
}
}
 
+   @Override
+   public long length() {
+   return length;
+   }
+
+   @Override
+   public long progress() {
+   return length - buf.readableBytes();
--- End diff --

changed to `return buf.readerIndex()` since progress is not well defined.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194769995
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
 ---
@@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) 
throws Exception {
}
}
 
+   @Override
+   public long length() {
+   return length;
--- End diff --

changed to `-1`


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194763826
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
 ---
@@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int 
index, int length, int m
 
@Override
public ByteBuf unwrap() {
-   return super.unwrap().unwrap();
+   return super.unwrap();
--- End diff --

as discussed offline, this is because of change in the implementation of 
`SlicedByteBuf`.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194718801
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
+
+   @Test
+   public void testNettyEpoll() throws Exception {
+   Optional cluster = trySetUp();
+   if (!cluster.isPresent()) {
+   return;
+   }
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(TASK_MANAGERES);
+   env.getConfig().disableSysoutLogging();
+
+   DataStream input = env.fromElements(1, 2, 3, 
4, 1, 2, 3, 42);
+   input.keyBy(new KeySelector() {
+   @Override
+   public Integer getKey(Integer value) 
throws Exception {
+   return value;
+   }
+   })
+   .sum(0)
+   .print();
+
+   env.execute();
--- End diff --

The same question would remain: whether such artificial setup, setups a 
network stack in the same way as Flink.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194713008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
 ---
@@ -97,11 +98,17 @@ public ReferenceCounted retain(int arg0) {
 
@Override
public ReferenceCounted touch() {
+   if (requestAsReferenceCounted.isPresent()) {
+   
ReferenceCountUtil.touch(requestAsReferenceCounted.get());
+   }
return this;
}
 
@Override
public ReferenceCounted touch(Object o) {
+   if (requestAsReferenceCounted.isPresent()) {
+   
ReferenceCountUtil.touch(requestAsReferenceCounted.get());
--- End diff --

I think we could do that. I'm not entirely sure how this whole `touch` is 
suppose to work, but passing down the hint shouldn't harm anything and be more 
correct.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194711763
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
 ---
@@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) 
throws Exception {
}
}
 
+   @Override
+   public long length() {
+   return length;
--- End diff --

It seems like content of `buf` is not changing. However we can not guard 
against it programmatically, only via java doc. On the other hand, I'm not sure 
what would the side effects of "unknown" length be.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194671209
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
+
+   @Test
+   public void testNettyEpoll() throws Exception {
+   Optional cluster = trySetUp();
+   if (!cluster.isPresent()) {
+   return;
+   }
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(TASK_MANAGERES);
+   env.getConfig().disableSysoutLogging();
+
+   DataStream input = env.fromElements(1, 2, 3, 
4, 1, 2, 3, 42);
+   input.keyBy(new KeySelector() {
+   @Override
+   public Integer getKey(Integer value) 
throws Exception {
+   return value;
+   }
+   })
+   .sum(0)
+   .print();
+
+   env.execute();
--- End diff --

Is it strictly necessary that this test must be an integration test case? 
As far as I understand the test case, wouldn't it be enough to setup a 
`NettyServer` with `epoll` activated and sending some data to it via a 
`NettyClient` with `epoll` activated? Since we don't assert anything else than 
whether the Flink program executes or not, it should be basically the same.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r19412
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java
 ---
@@ -64,12 +69,13 @@
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
 import static org.junit.Assume.assumeTrue;
 
 /**
  * An abstract test class for channel buffers.
  *
- * Copied from netty 4.0.50 with some changes to fit our netty version 
4.0.27.
+ * Copy from netty 4.1.24.Final.
  */
 public abstract class AbstractByteBufTest {
--- End diff --

Let's extend this class from `TestLogger`.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194663296
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
 ---
@@ -97,11 +98,17 @@ public ReferenceCounted retain(int arg0) {
 
@Override
public ReferenceCounted touch() {
+   if (requestAsReferenceCounted.isPresent()) {
+   
ReferenceCountUtil.touch(requestAsReferenceCounted.get());
+   }
return this;
}
 
@Override
public ReferenceCounted touch(Object o) {
+   if (requestAsReferenceCounted.isPresent()) {
+   
ReferenceCountUtil.touch(requestAsReferenceCounted.get());
--- End diff --

Shall we pass `o` to the `ReferenceCountUtil.touch` call?


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194652733
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
 ---
@@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) 
throws Exception {
}
}
 
+   @Override
+   public long length() {
+   return length;
+   }
+
+   @Override
+   public long progress() {
+   return length - buf.readableBytes();
--- End diff --

If the length is not know, then I think it would be better to return 
something like `buf.readerIndex - initialReaderIndex`.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194660815
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
 ---
@@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int 
index, int length, int m
 
@Override
public ByteBuf unwrap() {
-   return super.unwrap().unwrap();
+   return super.unwrap();
--- End diff --

But isn't it a bit counter-intuitive that you instantiate a 
`ReadOnlySlicedNetworkBuffer` with a `NetworkBuffer` and when you call 
`ReadOnlySlicedNetworkBuffer.unwrap` you don't get a `NetworkBuffer` back but a 
`SlicedByteBuf`? Thus, you need to know the internals of this class to know 
that you have to call twice `unwrap` to obtain the `NetworkBuffer`. This shows 
for example in the `NetworkBufferTest` where we always call 
`slice.unwrap().unwrap()`.

What exactly did change with the structure of nested classes that this 
change is necessary?


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r194652495
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
 ---
@@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) 
throws Exception {
}
}
 
+   @Override
+   public long length() {
+   return length;
--- End diff --

Is it always guaranteed that `buf.readableBytes` won't change? If not, then 
I think we should return `-1` here, because the length is unknown.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-06-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r193345578
  
--- Diff: pom.xml ---
@@ -300,15 +300,7 @@ under the License.

org.apache.flink
flink-shaded-netty
-   
-   
4.0.27.Final-${flink.shaded.version}
+   
4.1.24.Final-${flink.shaded.version}
--- End diff --

I've opened #6128 to bump the remaining version so this PR can stay focused 
on netty.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191685431
  
--- Diff: pom.xml ---
@@ -300,15 +300,7 @@ under the License.

org.apache.flink
flink-shaded-netty
-   
-   
4.0.27.Final-${flink.shaded.version}
+   
4.1.24.Final-${flink.shaded.version}
--- End diff --

I hope that bumping to from `2.0` to `4.0` will not break other things, but 
yes, that was my intention when `4.0` will be on mvn central.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191685731
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
 ---
@@ -215,16 +242,33 @@ protected void _setMedium(int index, int value) {
setByte(index + 2, (byte) value);
}
 
+   @Override
+   protected void _setMediumLE(int index, int value){
+   setByte(index, (byte) value);
--- End diff --

added comment


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191700895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
 ---
@@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int 
index, int length, int m
 
@Override
public ByteBuf unwrap() {
-   return super.unwrap().unwrap();
+   return super.unwrap();
--- End diff --

Necessary for the upgrade, otherwise our tests (including `ITCases`) are 
failing. In upgraded version there is a different structure of nested classes 
in case of using slices of readonly buffers. Possibly this is a bug fix for a 
bug, that didn't show up in older netty.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191700141
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
 ---
@@ -424,6 +480,19 @@ public int setBytes(int index, ScatteringByteChannel 
in, int length) throws IOEx
}
}
 
+   @Override
+   public int setBytes(int index, FileChannel in, long position, int 
length) throws IOException {
+   // adapted from UnpooledDirectByteBuf:
+   checkIndex(index, length);
+
+   ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+   try {
+   return in.read(tmpBuf);
--- End diff --

ops, good catch. Even better catch is that this was not covered by any 
test, because I assumed `AbstractByteBufTest` comes from netty, while in 
reality it was copied to our code.

Fixed this lack of `position` bug and upgraded the `AbstractByteBufTest` as 
well - newer version was correctly failing with this bug.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191684561
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
+
+   @Test
+   public void testNettyEpoll() throws Exception {
+   Optional cluster = trySetUp();
+   if (!cluster.isPresent()) {
+   return;
+   }
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(TASK_MANAGERES);
+   env.getConfig().disableSysoutLogging();
+
+   DataStream input = env.fromElements(1, 2, 3, 
4, 1, 2, 3, 42);
+   input.keyBy(new KeySelector() {
+   @Override
+   public Integer getKey(Integer value) 
throws Exception {
+   return value;
+   }
+   })
+   .sum(0)
+   .print();
+
+   env.execute();
--- End diff --

Hmmm, depends what you would like to test and depends on black/white box 
approach. With white box where you assume/know that `Netty` is loading `native` 
libraries only during setup and if you assume that if they are loaded, that 
they will work correctly, then no, execution is unnecessary.  But I don't want 
to relay on both of those assumptions.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191683459
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
+
+   @Test
+   public void testNettyEpoll() throws Exception {
+   Optional cluster = trySetUp();
+   if (!cluster.isPresent()) {
+   return;
+   }
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(TASK_MANAGERES);
+   env.getConfig().disableSysoutLogging();
+
+   DataStream input = env.fromElements(1, 2, 3, 
4, 1, 2, 3, 42);
+   input.keyBy(new KeySelector() {
+   @Override
+   public Integer getKey(Integer value) 
throws Exception {
+   return value;
+   }
+   })
+   .sum(0)
+   .print();
+
+   env.execute();
+   }
+   finally {
+   cluster.get().after();
+   }
+   }
+
+   private Optional trySetUp() throws Exception {
+   try {
+   MiniClusterResource cluster = new MiniClusterResource(
+   new MiniClusterResourceConfiguration(
+   getConfiguration(),
+   TASK_MANAGERES,
+   1),
+   true);
+   cluster.before();
+   return Optional.of(cluster);
+   }
+   catch (UnsatisfiedLinkError ex) {
+   // If we failed to init netty because we are not on 
Linux platform, abort the test.
+   if (findThrowableWithMessage(ex, "Only supported on 
Linux").isPresent()) {
+   return Optional.empty();
--- End diff --

I didn't know about `AssumptionViolatedException`. Thanks for pointing this 
out.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191386855
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
 ---
@@ -424,6 +480,19 @@ public int setBytes(int index, ScatteringByteChannel 
in, int length) throws IOEx
}
}
 
+   @Override
+   public int setBytes(int index, FileChannel in, long position, int 
length) throws IOException {
+   // adapted from UnpooledDirectByteBuf:
+   checkIndex(index, length);
+
+   ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+   try {
+   return in.read(tmpBuf);
--- End diff --

are you ignoring the `position` argument intentionally?


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191379939
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
+
+   @Test
+   public void testNettyEpoll() throws Exception {
+   Optional cluster = trySetUp();
+   if (!cluster.isPresent()) {
+   return;
+   }
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(TASK_MANAGERES);
+   env.getConfig().disableSysoutLogging();
+
+   DataStream input = env.fromElements(1, 2, 3, 
4, 1, 2, 3, 42);
+   input.keyBy(new KeySelector() {
+   @Override
+   public Integer getKey(Integer value) 
throws Exception {
+   return value;
+   }
+   })
+   .sum(0)
+   .print();
+
+   env.execute();
+   }
+   finally {
+   cluster.get().after();
+   }
+   }
+
+   private Optional trySetUp() throws Exception {
+   try {
+   MiniClusterResource cluster = new MiniClusterResource(
+   new MiniClusterResourceConfiguration(
+   getConfiguration(),
+   TASK_MANAGERES,
+   1),
+   true);
+   cluster.before();
+   return Optional.of(cluster);
+   }
+   catch (UnsatisfiedLinkError ex) {
+   // If we failed to init netty because we are not on 
Linux platform, abort the test.
+   if (findThrowableWithMessage(ex, "Only supported on 
Linux").isPresent()) {
+   return Optional.empty();
--- End diff --

couldn't you fail here with an ´AssumptionViolatedException`? Then we 
wouldn't have to deal with optionals.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191379552
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
--- End diff --

typo: can only run


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191386142
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
 ---
@@ -215,16 +242,33 @@ protected void _setMedium(int index, int value) {
setByte(index + 2, (byte) value);
}
 
+   @Override
+   protected void _setMediumLE(int index, int value){
+   setByte(index, (byte) value);
--- End diff --

was this also taken from `UnpooledDirectByteBuf`?


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191385215
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
 ---
@@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int 
index, int length, int m
 
@Override
public ByteBuf unwrap() {
-   return super.unwrap().unwrap();
+   return super.unwrap();
--- End diff --

Is this change necessary for the upgrade or cleanup?


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191380426
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
+
+   @Test
+   public void testNettyEpoll() throws Exception {
+   Optional cluster = trySetUp();
+   if (!cluster.isPresent()) {
+   return;
+   }
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(TASK_MANAGERES);
+   env.getConfig().disableSysoutLogging();
+
+   DataStream input = env.fromElements(1, 2, 3, 
4, 1, 2, 3, 42);
+   input.keyBy(new KeySelector() {
+   @Override
+   public Integer getKey(Integer value) 
throws Exception {
+   return value;
+   }
+   })
+   .sum(0)
+   .print();
+
+   env.execute();
+   }
+   finally {
+   cluster.get().after();
+   }
+   }
+
+   private Optional trySetUp() throws Exception {
+   try {
+   MiniClusterResource cluster = new MiniClusterResource(
+   new MiniClusterResourceConfiguration(
+   getConfiguration(),
+   TASK_MANAGERES,
+   1),
+   true);
+   cluster.before();
+   return Optional.of(cluster);
+   }
+   catch (UnsatisfiedLinkError ex) {
+   // If we failed to init netty because we are not on 
Linux platform, abort the test.
+   if (findThrowableWithMessage(ex, "Only supported on 
Linux").isPresent()) {
+   return Optional.empty();
+   }
+   throw ex;
+   }
+   }
+
+   private static Configuration getConfiguration() {
--- End diff --

I would in-line this method.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191384185
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
 ---
@@ -94,4 +94,14 @@ public ReferenceCounted retain(int arg0) {
}
return this;
}
+
+   @Override
+   public ReferenceCounted touch() {
+   return this;
+   }
+
+   @Override
+   public ReferenceCounted touch(Object o) {
+   return this;
--- End diff --

We may want to `touch` the contained request as well like in 
[this](https://github.com/netty/netty/blob/4.1/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java)
 class, depending on `requestAsReferenceCounted`.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191380808
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
+
+   @Test
+   public void testNettyEpoll() throws Exception {
+   Optional cluster = trySetUp();
+   if (!cluster.isPresent()) {
+   return;
+   }
+
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(TASK_MANAGERES);
+   env.getConfig().disableSysoutLogging();
+
+   DataStream input = env.fromElements(1, 2, 3, 
4, 1, 2, 3, 42);
+   input.keyBy(new KeySelector() {
+   @Override
+   public Integer getKey(Integer value) 
throws Exception {
+   return value;
+   }
+   })
+   .sum(0)
+   .print();
+
+   env.execute();
--- End diff --

The job execution isn't necessary is it?


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191381531
  
--- Diff: pom.xml ---
@@ -300,15 +300,7 @@ under the License.

org.apache.flink
flink-shaded-netty
-   
-   
4.0.27.Final-${flink.shaded.version}
+   
4.1.24.Final-${flink.shaded.version}
--- End diff --

you (will) have to bump `flink.shaded.version` to 4.0. Doing this will not 
negatively affect other dependencies.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191379687
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java 
---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import 
org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
+import static 
org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+
+/**
+ * Test network stack with taskmanager.network.netty.transport set to 
epoll. This test car only run
+ * on linux. On other platforms it's basically a NO-OP. See
+ * https://github.com/apache/flink-shaded/issues/30
+ */
+public class NettyEpollITCase extends TestLogger {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEpollITCase.class);
+
+   private static final int TASK_MANAGERES = 2;
--- End diff --

`NUM_TASK_MANAGERS`, also a type


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r191150025
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -247,7 +249,15 @@ public void shutdown(Time timeout) {
 
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object 
msg) {
-   if (msg instanceof FullHttpResponse) {
+   // TODO: should this check for status OK (200) and 
treat all other as errors?
--- End diff --

yes I think we can just drop it.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-25 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190884312
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -247,7 +249,15 @@ public void shutdown(Time timeout) {
 
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object 
msg) {
-   if (msg instanceof FullHttpResponse) {
+   // TODO: should this check for status OK (200) and 
treat all other as errors?
--- End diff --

Thx for filling this out. In that case should I just drop this `// TODO:`?


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190705206
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -247,7 +249,15 @@ public void shutdown(Time timeout) {
 
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object 
msg) {
-   if (msg instanceof FullHttpResponse) {
+   // TODO: should this check for status OK (200) and 
treat all other as errors?
--- End diff --

message semantics are handled later when parsing the payload, which 
effectively does what you're suggesting. The else branch for debugging; our 
rest servers always return `FullHttpResponses`.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190554721
  
--- Diff: pom.xml ---
@@ -308,7 +308,7 @@ under the License.
errors.
 
[1] https://github.com/netty/netty/issues/3704 
-->
--- End diff --

Ops, dropped.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190554149
  
--- Diff: pom.xml ---
@@ -308,7 +308,7 @@ under the License.
errors.
 
[1] https://github.com/netty/netty/issues/3704 
-->
--- End diff --

Looks like you can remove this comment now.


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/6071

[FLINK-3952][runtine] Upgrade to Netty 4.1

This PR adjusts our code to work with Netty 4.1. It also includes possible 
bug fix to file uploading cleanup in FileUploadHandler and HttpRequestHandler. 
For mor information look here:

https://github.com/netty/netty/issues/7611

First commit is only for having green travis and will be dropped once new 
`flink-shadded-netty` will be released.

## Verifying this change

This change is covered by variety of pre existing tests. Furthermore I have 
manually verified that issue mentioned by @uce in the commit message here: 
https://github.com/apache/flink/commit/d92e422ec7089376583a8f57043274d236c340a4
doesn't happen: 
- I have reproduced this issue on a test cluster with Flink 1.0-XXX
- I have verified that the same job passes without any problems after 
upgrading to Netty 4.1

I have also run our network benchmark suite and verified that there are no 
performance changes after this change.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f3952

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6071.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6071


commit 7b3be7c9ebac7392136ed85bca4664559710552e
Author: Piotr Nowojski 
Date:   2018-05-14T11:46:08Z

[hotfix][tests] Report failure with error level instead of debug

commit afaf1d5181c7133a040bf3881723e240145a4b0a
Author: Piotr Nowojski 
Date:   2018-05-16T19:26:36Z

[FLINK-9386] Embed netty router

This commit replaces netty-router dependency with our own version of it, 
which is
simplified and adds guarantees about order of matching router patterns.

This is a prerequisite for FLINK-3952. netty-router 1.10 is incompatible 
with
Netty 4.1, while netty-router 2.2.0 brakes a compatibility in a way that we
were unable to use it.

commit 26bc92db0863bf53e60164ab5f6b92ac3b424506
Author: Piotr Nowojski 
Date:   2018-05-14T10:30:31Z

Embed flink-shaded-netty-4

commit 94a4cc2237b5dac0c004ec192eb4d7f1b782e5f2
Author: Piotr Nowojski 
Date:   2018-05-16T19:27:22Z

[FLINK-3952][runtine] Upgrade to Netty 4.1

This commit includes possible bug fix to file uploading cleanup in 
FileUploadHandler and
HttpRequestHandler. For mor information look here:

https://github.com/netty/netty/issues/7611




---