[GitHub] flink issue #3086: Improve docker setup

2017-01-11 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/3086
  
@kaelumania `ARG` is only available from Docker 1.10 on and upwards. I 
believe docker-compose also offers to configure environment variables either 
through the compose file or via a command-line argument. If we refrained from 
`ARG`, we could keep backwards-compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3086: Improve docker setup

2017-01-13 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/3086
  
No, looks good. Thanks. As Greg mentioned, please open a JIRA issue next 
time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...

2017-01-27 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2917
  
>edit: Re-reading the PR description, I actually got it to work by setting 
jobmanager.rpc.address to the external IP and getting rid of the hostname.

Glad you were able to solve the problem! :) Akka requires that all messages 
are tagged with the same address which was used during initialization of the 
receiver actor system. While this PR gets rid of the issue that an address 
might not be available for binding during initialization of the receiver, it 
still requires you to use a consistent address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scripts] resolve base path of symlinked execu...

2015-08-24 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1049

[scripts] resolve base path of symlinked executable

This bootstraps Flink from a symlinked bin/flink executable. It's a special 
case but IMHO worthwhile to consider.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink symlink-startup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1049.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1049


commit e4ba6459cad193ea7103d7f7f47825e8f25065af
Author: Maximilian Michels 
Date:   2015-08-24T12:40:47Z

[scripts] resolve base path of symlinked executable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scripts] resolve base path of symlinked execu...

2015-08-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1049#issuecomment-134192838
  
Unfortunately, we cannot access the config.sh file in this case. So code 
reuse is not possible. I've also seen the code you posted. It looks awfully 
hacky and we could at least use my code snippet also in the `scripts.sh`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37748844
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Sock

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37748808
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Sock

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37748868
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
--- End diff --

Did you have to add the sleep here to make the test pass? 10 seconds is 
quite long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37749287
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Sock

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37749598
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Sock

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134199343
  
@HuangWHWHW Sorry for keeping you waiting. I've made some more comments. 
Otherwise, I think this looks ready to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-08-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-134200054
  
I see the point in keeping the Akka message-based transport of 
accumulators. @kl0u Sorry, I was also on vacations. Could you rebase to the 
current master again? Then, I think we can finally merge your changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [scripts] resolve base path of symlinked execu...

2015-08-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1049#issuecomment-134230260
  
Yes, the two are in sync now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37762004
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return null;
+   }
+
+   @Override
+   public ClassLoader getUserClassLoader() {
+ 

[GitHub] flink pull request: [scripts] resolve base path of symlinked execu...

2015-08-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1049#issuecomment-134298584
  
Apparently, the `readlink` utility is not part of POSIX. So we might have 
to revert to the old way of parsing the output of `ls -ld` to stay compatible. 
It is actually more reliable than I thought because POSIX specifies that the 
output of `ls -ld` should always be of the form "link_name -> target".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37844142
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   }
+   sleep(1);
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+   assertEquals(0, source.socketSource.retries);
+
+   source.start();
+
+   Sock

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37845122
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   public class printStreamMock extends PrintStream{
+
+   public String result;
+
+   public printStreamMock(OutputStream out) {
+   super(out);
+   }
+
+   @Override
+   public void println(String x) {
+   this.result = x;
+   }
+   }
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
  

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37845214
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
+   public MemoryManager getMemoryManager() {
+   return null;
+   }
+
+   @Override
+   public String getTaskName() {
+   return null;
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return null;
+   }
+
+   @Override
+   public ClassLoader getUserClassLoader() {
+ 

[GitHub] flink pull request: [scripts] resolve base path of symlinked execu...

2015-08-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1049#issuecomment-134554634
  
Here you go. Thanks for testing!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2555] Properly pass security credential...

2015-08-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1038#issuecomment-134563009
  
It would be great if we implemented a test case against the MiniKDC server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37856724
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+ 

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/991#issuecomment-134565956
  
Thanks for the updates! Looks good and I think we can merge your changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134567337
  
Looks good to merge if we further adjust the waiting time of the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37859069
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+ 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37860912
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+   }
+
+   public void run() {
+   try {
+   this.socketSource.open(new Configuration());
+   this.socketSource.run(ctx);
+   }catch(Exception e){
+   error.set(e);
+   }
+   }
+
+   public void cancel(){
+   this.socketSource.cancel();
+   }
+   }
+
+   @Test
+   public void testSocketSourceRetryForever() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, -1);
+   source.start();
+
+   int count = 0;
+   Socket channel;
+   while (count < 100) {
+   channel = serverSo.accept();
+   count++;
+   channel.close();
+   assertEquals(0, source.socketSource.retries);
+   }
+   source.cancel();
+
+   if (error.get() != null) {
+   Throwable t = error.get();
+   t.printStackTrace();
+   fail("Error in spawned thread: " + t.getMessage());
+   }
+
+   assertEquals(100, count);
+   }
+
+   @Test
+public void testSocketSourceRetryTenTimes() throws Exception{
+   error.set(null);
+   ServerSocket serverSo = new ServerSocket(0);
+   SocketSource source = new SocketSource(serverSo, 10);
+
+ 

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37879401
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   public static int CONNECTION_RETRY_SLEEP = 1000;
--- End diff --

This shouldn't be modifiable by everyone. Please make it just 
package-visible by removing the `public` modifier. Also, please keep the 
`final` modifier because the current implementation just lets the number of 
retries be configurable with a fixed 1 second retry rate. This is also 
documented in the user-facing API methods on DataStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2555] Properly pass security credential...

2015-08-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1038#issuecomment-134632899
  
I've opened another issue for that: 
https://issues.apache.org/jira/browse/FLINK-2573


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1057#discussion_r37894302
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -127,7 +127,7 @@ FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib
 # The above lib path is used by the shell script to retrieve jars in a 
 # directory, so it needs to be unmangled.
 FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"`
-FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf
+if [ -z "$FLINK_CONF_DIR" ]; then 
FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi
--- End diff --

Maybe just code style but could you make this more explicit using if-else 
blocks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand

2015-08-25 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1057#issuecomment-134679865
  
Very useful feature. In addition, I could also imagine that the config file 
could be passed as a parameter to the ./bin/flink utility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand

2015-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1057#discussion_r37895597
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -127,7 +127,7 @@ FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib
 # The above lib path is used by the shell script to retrieve jars in a 
 # directory, so it needs to be unmangled.
 FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"`
-FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf
+if [ -z "$FLINK_CONF_DIR" ]; then 
FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi
--- End diff --

Yes, you don't need the else here because the variable is set either 
through the environment or in the if block. Still, I'd prefer newlines but it 
is maybe just a matter of taste here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37959211
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   public static int CONNECTION_RETRY_SLEEP = 1000;
--- End diff --

Ok no problem. Then make the variable non-final but don't expose it. So 
just have it `static int CONNECTION_RETRY_SLEEP = 1000`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r37959366
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
+   public String result;
+
+   @Override
+   public void collect(String element) {
+   result = element;
+   }
+
+   @Override
+   public String toString() {
+   return this.result;
+   }
+
+   @Override
+   public void collectWithTimestamp(String element, long 
timestamp) {
+
+   }
+
+   @Override
+   public void emitWatermark(Watermark mark) {
+
+   }
+
+   @Override
+   public Object getCheckpointLock() {
+   return null;
+   }
+
+   @Override
+   public void close() {
+
+   }
+   };
+
+   public SocketTextStreamFunctionTest() {
+   }
+
+   class SocketSource extends Thread {
+
+   SocketTextStreamFunction socketSource;
+
+   public SocketSource(ServerSocket serverSo, int maxRetry) throws 
Exception {
+   this.socketSource =  new SocketTextStreamFunction(host, 
serverSo.getLocalPort(), '\n', maxRetry);
+
+   Field field = 
SocketTextStreamFunction.class.getDeclaredField("CONNECTION_RETRY_SLEEP");
+   field.setAccessible(true);
+   Field modifiersField = 
Field.class.getDeclaredField("modifiers");
+   modifiersField.setAccessible(true);
+   modifiersField.setInt(field, field.getModifiers() & 
~Modifier.FINAL);
+   field.set(null, 200);
--- End diff --

That's quite a hack. I think it is ok to remove the `final` modifier and 
make the field variable package-local: `static int CONNECTION_RETRY_SLEEP = 
1000`. Then you can set it directly here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-134902558
  
@HuangWHWHW Thanks for your changes. Adding reflection calls to the testing 
codes is not good practice and makes the code hard to maintain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37959743
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   public class printStreamMock extends PrintStream{
+
+   public String result;
+
+   public printStreamMock(OutputStream out) {
+   super(out);
+   }
+
+   @Override
+   public void println(String x) {
+   this.result = x;
+   }
+   }
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
  

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37962927
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   public class printStreamMock extends PrintStream{
+
+   public String result;
+
+   public printStreamMock(OutputStream out) {
+   super(out);
+   }
+
+   @Override
+   public void println(String x) {
+   this.result = x;
+   }
+   }
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+   return null;
+   }
+
+   @Override
  

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-26 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r37965403
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   public PrintStream PrintStreamOriginal = System.out;
--- End diff --

By convention, this should be lower case here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand

2015-08-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1057#issuecomment-134934294
  
Could you add some documentation? Looks good to merge otherwise :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Allow setting FLINK_CONF_DIR by hand

2015-08-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1057#issuecomment-134936395
  
Ok thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-26 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/991#issuecomment-135062701
  
Thanks @HuangWHWHW. I'll merge your changes when Travis has completed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-135351424
  
While merging your pull request I noticed that the 
`SocketTextStreamFunction` actually does not wait the time specified in 
`CONNCTION_RETRY_SLEEP` but immediately tries to reconnect in case of an EOF. 
It only waits in case of a `ConnectionError`. I'm not sure whether this 
behavior is desired but this should also be reflected in your test cases. Could 
you add a test case where you first pass a Socket with an EOF and then let an 
`ConnectionError` occur?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-27 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r38076538
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   public PrintStream printStreamOriginal = System.out;
+
+   public class printStreamMock extends PrintStream{
+
+   public String result;
+
+   public printStreamMock(OutputStream out) {
+   super(out);
+   }
+
+   @Override
+   public void println(String x) {
+   this.result = x;
+   }
+   }
+
+   private Environment envForPrefixNull = new Environment() {
--- End diff --

You can replace all this by `Mockito.mock(Environment.class)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-27 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/991#discussion_r38076582
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ */
+public class PrintSinkFunctionTest extends RichSinkFunction {
+
+   public PrintStream printStreamOriginal = System.out;
+
+   public class printStreamMock extends PrintStream{
+
+   public String result;
+
+   public printStreamMock(OutputStream out) {
+   super(out);
+   }
+
+   @Override
+   public void println(String x) {
+   this.result = x;
+   }
+   }
+
+   private Environment envForPrefixNull = new Environment() {
+   @Override
+   public JobID getJobID() {
+   return null;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return null;
+   }
+
+   @Override
+   public ExecutionAttemptID getExecutionId() {
+   return null;
+   }
+
+   @Override
+   public Configuration getTaskConfiguration() {
+   return null;
+   }
+
+   @Override
+   public TaskManagerRuntimeInfo getTaskManagerInfo() {
+   return null;
+   }
+
+   @Override
+   public Configuration getJobConfiguration() {
+   return null;
+   }
+
+   @Override
+   public int getNumberOfSubtasks() {
+   return 0;
+   }
+
+   @Override
+   public int getIndexInSubtaskGroup() {
+   return 0;
+   }
+
+   @Override
+   public InputSplitProvider getInputSplitProvider() {
+   return null;
+   }
+
+   @Override
+   public IOManager getIOManager() {
+  

[GitHub] flink pull request: [FLINK-2480][test]Add tests for PrintSinkFunct...

2015-08-27 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/991#issuecomment-135360612
  
Thank you @HuangWHWHW. I merged your pull request with a few minor changes 
(see new comments).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2594][client] implement a method to ret...

2015-08-28 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1072

[FLINK-2594][client] implement a method to retrieve the accumulators …

…of a job

- move SerializedValue from runtime to core
- unified code to deserialize accumulators

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink client-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1072.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1072


commit 6604abbabf8167fba86c0226dfc68626fc994848
Author: Maximilian Michels 
Date:   2015-08-28T15:01:41Z

[FLINK-2594][client] implement a method to retrieve the accumulators of a 
job

- move SerializedValue from runtime to core
- unified code to deserialize accumulators




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request:

2015-08-29 Thread mxm
Github user mxm commented on the pull request:


https://github.com/apache/flink/commit/a18994aabfba7c3775f6dc911bd9d59016216817#commitcomment-12961579
  
In 
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java:
In 
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
 on line 22:
Thanks for noticing. This requires some refactoring because the test 
depends on a method in `CommonTestUtils`. Is it desired to have a 
CommonTestUtils clases in each runtime and core? As far as I can see the 
methods are not runtime or core-specific, so the two can be combined into one 
class residing in core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [core][runtime] move SerializedValueTest from ...

2015-08-31 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1081

[core][runtime] move SerializedValueTest from runtime to core

- move createCopySerializable to core's CommonTestUtils
- rename CommonTestUtils createCopy to createCopyWritable
- adapt the tests to use core's CommonTestUtils where applicable

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink client-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1081.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1081


commit 60c2d7968030982bfe37f794519360a7d7e825b3
Author: Maximilian Michels 
Date:   2015-08-31T12:04:58Z

[core][runtime] move SerializedValueTest from runtime to core

- move createCopySerializable to core's CommonTestUtils
- rename CommonTestUtils createCopy to createCopyWritable
- adapt the tests to use core's CommonTestUtils where applicable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request:

2015-08-31 Thread mxm
Github user mxm commented on the pull request:


https://github.com/apache/flink/commit/a18994aabfba7c3775f6dc911bd9d59016216817#commitcomment-12974542
  
In 
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java:
In 
flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
 on line 22:
@StephanEwen Do you think #1081 is feasible?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-08-31 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136357113
  
Looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136754340
  
@HuangWHWHW Thank you for addressing my comments. Could you please squash 
your commits and force push to this branch again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136758616
  
@HuangWHWHW Stephan is talking about something like this instead of the 
PrintStreamMock:

```java
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream captureStream = new PrintStream(baos);
PrintStream original = System.out;
System.setOut(captureStream);

System.out.println("Printing one line");
System.out.println("Another line");

System.setOut(original);
captureStream.close();

Assert.equals("Printing one line\nAnotherline\n", baos.toString());
```

You can see that we're using a `PrintStream` with a `ByteArrayOutputStream` 
here to capture the contents that are being printed to standard out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-136765025
  
Hi @kl0u. I think something went wrong during rebasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-136767221
  
From the GitHub UI it looks like your pull request branch and the Flink 
master branch have diverged, i.e. your changes are no longer based on a commit 
from the master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-136774275
  
`git merge-base master framesize_fix` should show you the commit the 
branches diverged. Depending on the outcome, you might have to go back to an 
earlier version of your changes and rebase those against the master. It might 
be easier to squash all your commits before rebasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-136781645
  
@kl0u That's fine. We'll be preparing a new milestone release based on the 
master. I think it makes sense to merge your changes afterwards for the 0.10 
release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2607][quickstart] ignore signature file...

2015-09-02 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1085

[FLINK-2607][quickstart] ignore signature files when creating fat jar



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink quickstart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1085


commit 9bc513129755b82a9b71b3b21c7ab4d6a0305cf8
Author: Maximilian Michels 
Date:   2015-09-02T10:21:49Z

[FLINK-2607][quickstart] ignore signature files when creating fat jar




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2607][quickstart] ignore signature file...

2015-09-02 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1085#issuecomment-137088288
  
During a Flink training, many people were experiencing this problem. I'd 
also like to merge this to the Milestone branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1073#discussion_r38621410
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -97,20 +76,43 @@ public void testPrintSinkStdErr(){
try {
printSink.open(new Configuration());
} catch (Exception e) {
-   e.printStackTrace();
+   Assert.fail();
}
printSink.setTargetToStandardErr();
printSink.invoke("hello world!");
 
assertEquals("Print to System.err", printSink.toString());
-   assertEquals("hello world!", stream.result);
+   assertEquals("hello world!\r\n", baos.toString());
--- End diff --

Same compatibility issue here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1073#discussion_r38621376
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -73,21 +51,22 @@ public void testPrintSinkStdOut(){
try {
printSink.open(new Configuration());
} catch (Exception e) {
-   e.printStackTrace();
+   Assert.fail();
}
printSink.setTargetToStandardOut();
printSink.invoke("hello world!");
 
assertEquals("Print to System.out", printSink.toString());
-   assertEquals("hello world!", stream.result);
+   assertEquals("hello world!\r\n", baos.toString());
--- End diff --

This will work on Windows (`\r\n` for newline) but not on Unix systems 
(`\n` for newline).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-03 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1073#discussion_r38621439
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ---
@@ -97,20 +76,43 @@ public void testPrintSinkStdErr(){
try {
printSink.open(new Configuration());
} catch (Exception e) {
-   e.printStackTrace();
+   Assert.fail();
}
printSink.setTargetToStandardErr();
printSink.invoke("hello world!");
 
assertEquals("Print to System.err", printSink.toString());
-   assertEquals("hello world!", stream.result);
+   assertEquals("hello world!\r\n", baos.toString());
 
printSink.close();
+   stream.close();
}
 
-   @Override
-   public void invoke(IN record) {
+   @Test
+   public void testPrintSinkWithPrefix(){
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
 
+   PrintSinkFunction printSink = new PrintSinkFunction<>();
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
+   }
+   printSink.setTargetToStandardErr();
+   printSink.invoke("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("2> hello world!\r\n", baos.toString());
--- End diff --

Same compatibility issue here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-03 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-137372443
  
I think there is an issue with Travis at the moment. Could you force push 
to this branch again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2475] Rename Flink Client log file

2015-09-03 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1074#issuecomment-137393037
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implementation of distributed copying utility ...

2015-09-03 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1090#issuecomment-137477152
  
Thanks for your pull request! I'm assuming you would use this utility to 
copy files from your local to a remote file system, right? Your utility starts 
a Flink job to copy the files to the remote file systems. This only works if 
you execute it locally because otherwise the task managers need to have the 
files available and that might defeat the utility's purpose. Also, imagine 
someone embedding the tool in a Flink program. The person might wonder why 
his/her program actually executes two jobs (one for the utility, one for the 
actual job). 

I think this would be more useful as a utility function, e.g. in a 
`FileUtils` class in `flink-core`. The method there would receive a list of 
files and then upload the files like you did using Flink's `FileSystem` 
abstraction. We could still parallelize the method by starting multiple threads 
to upload the files.

Correct me if I'm wrong or misunderstood your pull request :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implementation of distributed copying utility ...

2015-09-03 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1090#issuecomment-137499875
  
Thanks for pointing me to the `distcp` page. So far, I was agnostic of this 
tool :) The performance difference between Hadoop and Flink should not be too 
different because the copying of files is mostly IO-bound work. Still, it is 
1.5 minutes faster.

Not sure if we can include your code in the Flink examples but definitely 
under `flink-contrib` where we usually put external tools that are not directly 
part of Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implementation of distributed copying utility ...

2015-09-04 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1090#issuecomment-137760319
  
Yes, I guess it is a better fit for the examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1320] Add an off-heap variant of the ma...

2015-09-07 Thread mxm
Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/290


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1320] Add an off-heap variant of the ma...

2015-09-07 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/290#issuecomment-138298479
  
Closing this to continue the discussion at #1093.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1093#issuecomment-138480430
  
That's an impressive pull request, @StephanEwen! From a first glance, it 
looks very well-thought-out. I would be glad to work on some of the remaining 
issues like the config entries or MiniCluster tests. In addition, I would like 
to add some nightly performance tests.

Is it fair to say that the off-heap memory doesn't add much value for 
small-sized JVMs? So the default Flink execution mode will use heap memory and 
one can switch to off-heap memory for setups with very large JVM heap memory 
size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2567]

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1059#issuecomment-138482074
  
Thanks for your contribution @tammymendt. You pull request has been merged 
but the auto-close didn't work :) Could you close the pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2567]

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1059#issuecomment-138484565
  
Oh sorry, that was a reference from Stephan's repository. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-138486624
  
@HuangWHWHW You can use `System.lineSeparator()` to get the either `\n` or 
`\r\n` depending on the operating system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2567]

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1059#issuecomment-138487601
  
@tammymendt Sorry for the confusion. Could you please re-open the pull 
request? I thought it had been merged but it was only merged into Stephan's 
repository...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38904579
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.DataOutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
--- End diff --

Could you replace this with a simple Mock using Mockito? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2567]

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1059#issuecomment-138493194
  
Now it's merged. This time for real :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-138494607
  
Looks good. Will merge this later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38909392
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.DataOutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+   final AtomicReference error = new 
AtomicReference();
+   private final String host = "127.0.0.1";
+
+   SourceFunction.SourceContext ctx = new 
SourceFunction.SourceContext() {
--- End diff --

You're right, you need to capture the input to the collect method. How 
about using an `ArgumentCaptor`? 
http://mockito.googlecode.com/svn/tags/1.8.0/javadoc/org/mockito/ArgumentCaptor.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1093#issuecomment-138506847
  
Let's merge this without changing the current state of affairs, i.e. 
keeping on-heap memory management as the default. Then we should file a JIRA to 
keep track of the remaining (minor) open issues.
- Getting `taskmanager.memory.off-heap`, `taskmanager.memory.size`, and 
`taskmanager.memory.off-heap-ratio` to work
- Integrating with the YarnTaskManagerRunner.
- Adding a test case for the FlinkMiniCluster


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-138544981
  
The test failures are unrelated to your changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-138608134
  
Nice :) Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-138625172
  
I've ported this pull request to the latest master. It was a lot more work 
than I anticipated because some classes had diverged significantly and merging 
them was a bit hard.

Due to some refactoring, the changes have grown quite large again and I 
know that makes reviewing hard. Despite that, I wouldn't delay merging this 
pull request much further. We can disable the session management until it is 
integrated with the rest of the system (intermediate results) by throwing an 
exception on the interface methods. If we decide later, that we want to delay 
this feature, we could also remove the session code. In that case, it would 
still make sense to merge this pull request because it contains a lot of nice 
refactoring.

With the session management in place, we can reuse already computed 
intermediate results with not too much effort. Actually, only some API changes 
are remaining to expose the session management to the user in production.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-09 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/858#discussion_r39017774
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 ---
@@ -53,41 +54,41 @@ class JobManagerFailsITCase(_system: ActorSystem)
   }
 
   "A TaskManager" should {
-"detect a lost connection to the JobManager and try to reconnect to 
it" in {
-
-  val num_slots = 13
-  val cluster = startDeathwatchCluster(num_slots, 1)
-
-  val tm = cluster.getTaskManagers(0)
-  val jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-
-  // disable disconnect message to test death watch
-  tm ! DisableDisconnect
-
-  try {
-within(TestingUtils.TESTING_DURATION) {
-  jmGateway.tell(RequestNumberRegisteredTaskManager, self)
-  expectMsg(1)
-
-  tm ! NotifyWhenJobManagerTerminated(jmGateway.actor)
-
-  jmGateway.tell(PoisonPill, self)
-
-  expectMsgClass(classOf[JobManagerTerminated])
-
-  cluster.restartLeadingJobManager()
-
-  cluster.waitForTaskManagersToBeRegistered()
-
-  cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-.tell(RequestNumberRegisteredTaskManager, self)
-
-  expectMsg(1)
-}
-  } finally {
-cluster.stop()
-  }
-}
+//"detect a lost connection to the JobManager and try to reconnect to 
it" in {
--- End diff --

Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-138848287
  
Of course! The following classes have been refactored in the course of 
integrating them with the session management:

**Client**
- Establish connection to JobManager on creation
- Refactor run method into `runBlocking` and `runDetached`
- Extract helper classes to generate the Plan
- Make Optimizer and JobGraph generation methods `static`
- Pass `ClassLoader` correctly (do not keep one per Client but rather let 
it be passed before submission)

**CliFrontend**
- `runBlocking` and `runDetached` methods by analogy with the Client class

**ExecutionEnvironment**, **LocalEnvironment**, **RemoteEnvironment**
- modified abstract class to support sessions (timeout and jobID generation)
- handle session management via Reapers and ShutdownHooks

**PlanExecutor**, **LocalExecutor**, **RemoteExecutor**
- modified interface
- support session termination
- set JobID on Plan

**JobManager**
- keep ExecutionGraph as long as session has not expired

Future issues:
- Support for sessions in streaming. Currently streaming jobs are agnostic 
of sessions.
- Representation of sessions in the JobManager web frontend. How do we 
represent updates to the ExecutionGraph in sessions?
- Build features on top of session management (e.g. intermediate results)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138863134
  
Hi @tammymendt. Thanks for the pull request! The accumulators work a little 
bit different now because they are now accumulated on a per-task basis and 
reported to the job manager in regular intervals.

The `clone()` method in `OperatorStatistics` doesn't create a deep copy of 
the object, i.e. some references are reused. That causes problems when merging 
the accumulators because runtime accumulators are modified while merging 
accumulators for sending them to the job manager.

I could make the test pass by a nasty deep copy using Java serialization. 
However, I didn't managed to make a proper copy using the provided interfaces. 
I think you can probably do that faster because you know the code very well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138866235
  
I tried again, this works:

```java
@Override
public OperatorStatistics clone(){
OperatorStatistics clone = new OperatorStatistics(config);
clone.min = min;
clone.max = max;
clone.cardinality = cardinality;

try {
ICardinality copy;
if (countDistinct instanceof LinearCounting) {
copy = new 
LinearCounting(config.getCountDbitmap());
} else if (countDistinct instanceof HyperLogLog) {
copy = new HyperLogLog(config.getCountDlog2m());
} else {
throw new IllegalStateException("Unsupported 
counter.");
}
clone.countDistinct = copy.merge(countDistinct);
} catch (CardinalityMergeException e) {
throw new RuntimeException("Faild to clone 
OperatorStatistics!");
}

try {
HeavyHitter copy;
if (heavyHitter instanceof LossyCounting) {
copy = new 
LossyCounting(config.getHeavyHitterFraction(), config.getHeavyHitterError());
} else if (heavyHitter instanceof CountMinHeavyHitter) {
copy = new 
CountMinHeavyHitter(config.getHeavyHitterFraction(),
config.getHeavyHitterError(),

config.getHeavyHitterConfidence(),
config.getHeavyHitterSeed());
} else {
throw new IllegalStateException("Unsupported 
counter.");
}
copy.merge(heavyHitter);
clone.heavyHitter = copy;
} catch (HeavyHitterMergeException e) {
throw new RuntimeException("Failed to clone 
OperatorStatistics!");
}

return clone;
}
```

Do you think we could merge your pull request with this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138895326
  
Yes, there are tests in place of the new accumulators. The included 
Accumulators all implement a deep copy in the clone() method. That's why they 
didn't have to be ported. You don't need to include an extra test for the 
runtime accumulators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138895569
  
Could you update the pull request with the new `clone()` method? I will 
then merge the pull request if it passes Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2645][jobmanager] forward exceptions wh...

2015-09-09 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1112

[FLINK-2645][jobmanager] forward exceptions when merging final accumlator 
results

- fix forwarding
- add test case

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink accumulator-exceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1112.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1112


commit 387fa5e4021185753c950498b1c2e7a8d44223e5
Author: Maximilian Michels 
Date:   2015-09-09T11:49:40Z

[FLINK-2645][jobmanager] forward exceptions when merging final accumulators

- add test case




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1730]Persist operator on Data Sets

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1083#issuecomment-138923111
  
@sachingoel0101 I've opened a pull request some time ago to backtrack 
intermediate results on the network layer and then "backtrack" them during 
scheduling time: #640. At that time, I realized that this doesn't make much 
sense without a proper session management.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138937490
  
Looks good. Merging this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implementation of distributed copying utility ...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1090#issuecomment-138940821
  
Yes, this failed check is unrelated to your changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implementation of distributed copying utility ...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1090#issuecomment-138942223
  
Should we bundle the utility into a JAR like the other examples? If so, we 
need to adjust the `pom.xml` file in flink-examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2645][jobmanager] forward exceptions wh...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1112#issuecomment-138976625
  
Thanks for the review. I've since extended this pull request. Upon failure 
to deserialize the final task accumulator results, we now fail `Executions` and 
forward the Exception instead of just logging them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2632] Web Client does not respect the c...

2015-09-10 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1114#issuecomment-139232427
  
+1

I think the JobSubmissionServlet's interaction with the CliFrontend is a 
bit hacky. We should probably refactor this in the future. But that's a 
separate concern. Thanks for the fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1125

[FLINK-2641] integrate off-heap memory configuration

- remove taskmanager.heap.mb and jobmanager.heap.mb
- introduce taskmanager.memory.size and jobmanger.memory.size
  which allow to control total combined heap and off-heap memory

- add offheap configuration parameter taskmanager.memory.off-heap
- remove offheap ratio parameter and reuse memory fraction parameter

- rename taskmanager.memory.fraction to taskmanager.memory.managed.fraction
- rename taskmanager.memory.size to taskmanager.memory.managed.size

- set JVM -XX:MaxDirectMemorySize parameter correctly

- remove number of network buffers config entry
- allow configuration of network memory size via new entry
  taskmanager.memory.network.size
- adapt code and tests to changes

- remove deprecated ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY

- fail when obsolete config entries are detected

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink off_heap_config

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1125


commit 05e737034019b2edbb33f9372ba8389589f87eb4
Author: Maximilian Michels 
Date:   2015-09-11T14:07:22Z

[FLINK-2641] integrate off-heap memory configuration

- remove taskmanager.heap.mb and jobmanager.heap.mb
- introduce taskmanager.memory.size and jobmanger.memory.size
  which allow to control total combined heap and off-heap memory

- add offheap configuration parameter taskmanager.memory.off-heap
- remove offheap ratio parameter and reuse memory fraction parameter

- rename taskmanager.memory.fraction to taskmanager.memory.managed.fraction
- rename taskmanager.memory.size to taskmanager.memory.managed.size

- set JVM -XX:MaxDirectMemorySize parameter correctly

- remove number of network buffers config entry
- allow configuration of network memory size via new entry
  taskmanager.memory.network.size
- adapt code and tests to changes

- remove deprecated ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY

- fail when obsolete config entries are detected




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1125#issuecomment-140022781
  
In addition to the discussion in the JIRA, I had to change the way the 
network memory is configured because otherwise I cannot calculate the direct 
memory size correctly. The default network buffer size is not easy to get in 
the startup scripts without also putting it in the config. Besides that, I 
think it will be much easier for users to configure the network buffer size 
using an absolute value rather than some obscure numberOfNetworkBuffers.

> We have to be clear and careful with the term managed memory also, as the 
regular JVM heap is managed memory (managed by the JVM / GC) as opposed to the 
unmanaged memory in a C++ program.

In Flink, we always use the term managed memory to refer to our own memory 
management. So I think it makes sense to stick to that term. The former name 
`taskmanager.memory.size` was not clearer in any way because one would think 
that this refers to the total memory size rather than the Flink managed memory 
size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1125#issuecomment-140029554
  
That's why the pull request has the check for obsolete keys which fails to 
start Flink when old config entries are detected. In addition, it has sanity 
checks to check whether managed memory size < heap+offheap memory size. To make 
things clearer, we could use taskmanager.jvm.size (although the actual size 
depends on many other things). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1125#issuecomment-140030087
  
Yes, that will follow. I wanted to kick-off the discussion first because I 
had this gut-feeling that it might cause more discussion than in the JIRA once 
the pull request had been opened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1125#issuecomment-140035559
  
How about we rename `taskmanager.memory.size` to `taskmanager.memory`? Then 
there wouldn't be any possible clash. I could also live with 
`taskmanager.jvm.memory.size`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1129

[FLINK-2641] integrate off-heap memory configuration

Following the discussion in #1125, this pull request introduces a less 
"invasive" change to make the off-heap memory configurable.

- add offheap configuration parameter taskmanager.memory.off-heap

- remove offheap ratio parameter and reuse memory fraction parameter

- set JVM -XX:MaxDirectMemorySize parameter correctly

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink off_heap_config-lite

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1129.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1129


commit 7f6794cbf6cbec9c666f44148cf4b3227fad217a
Author: Maximilian Michels 
Date:   2015-09-14T13:07:23Z

[FLINK-2641] integrate off-heap memory configuration

- add offheap configuration parameter taskmanager.memory.off-heap

- remove offheap ratio parameter and reuse memory fraction parameter

- set JVM -XX:MaxDirectMemorySize parameter correctly




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/1125


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-14 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1125#issuecomment-140148408
  
Closing this in favor of #1129. For further discussion please see 
https://issues.apache.org/jira/browse/FLINK-2667.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    4   5   6   7   8   9   10   11   12   13   >