[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r38906129 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.io.DataOutputStream; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReference error = new AtomicReference(); + private final String host = "127.0.0.1"; + + SourceFunction.SourceContext ctx = new SourceFunction.SourceContext() { --- End diff -- The ctx is used to get the received msg from socket server. So I override the toString() method in ctx. I think it will be invalid if we use mockito to check the received msg is correct. Or is it unnecessary to check the msg since this test is for the retry times? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r38909392 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.io.DataOutputStream; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReference error = new AtomicReference(); + private final String host = "127.0.0.1"; + + SourceFunction.SourceContext ctx = new SourceFunction.SourceContext() { --- End diff -- You're right, you need to capture the input to the collect method. How about using an `ArgumentCaptor`? http://mockito.googlecode.com/svn/tags/1.8.0/javadoc/org/mockito/ArgumentCaptor.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-138518846 @mxm Hi, I added the ArgumentCaptor to the test and removed the unwanted code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-138608134 Nice :) Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/992 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-138268476 @mxm @StephanEwen Hi, very sorry for bothering. I got the CI passed. Is there any new comment or this can be merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-137745160 @mxm Ok, I make the CI rerun. Any new comment? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-137372443 I think there is an issue with Travis at the moment. Could you force push to this branch again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-136910004 @mxm @StephanEwen Hi, I just update one commit yesterday. And I found that a few PRs got the same trouble yesterday. Is there any issue in CI? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r38409485 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -42,11 +42,13 @@ private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; - private static final int CONNECTION_RETRY_SLEEP = 1000; + static int CONNECTION_RETRY_SLEEP = 1000; + protected long retries; private volatile boolean isRunning; public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { + this.retries = 0; --- End diff -- Initialization to `0` is not needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r38414236 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -42,11 +42,13 @@ private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; - private static final int CONNECTION_RETRY_SLEEP = 1000; + static int CONNECTION_RETRY_SLEEP = 1000; + protected long retries; private volatile boolean isRunning; public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) { + this.retries = 0; --- End diff -- Hi, I removed this in my new commit. Otherwise why doesn`t the CI run? This happen in all of my new commits. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-136754340 @HuangWHWHW Thank you for addressing my comments. Could you please squash your commits and force push to this branch again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-136739055 I am not sure why the CI is not retesting this. Can you try to squash your commits into one commit and force-push this branch? This always triggers CI for me... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-136285250 @mxm Hi, max. Any comment about my new changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-135429969 @mxm Hi, I add a test that first send a message to the SocketTextStreamFunction and this is success. Then I close the server let the SocketTextStreamFunction retry. Is it you need? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-135351424 While merging your pull request I noticed that the `SocketTextStreamFunction` actually does not wait the time specified in `CONNCTION_RETRY_SLEEP` but immediately tries to reconnect in case of an EOF. It only waits in case of a `ConnectionError`. I'm not sure whether this behavior is desired but this should also be reflected in your test cases. Could you add a test case where you first pass a Socket with an EOF and then let an `ConnectionError` occur? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37963338 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + + Field field = SocketTextStreamFunction.class.getDeclaredField(CONNECTION_RETRY_SLEEP); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField(modifiers); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() ~Modifier.FINAL); + field.set(null, 200); --- End diff -- ... Sorry for that. I changed this in my new one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37959211 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -42,11 +42,13 @@ private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; - private static final int CONNECTION_RETRY_SLEEP = 1000; + public static int CONNECTION_RETRY_SLEEP = 1000; --- End diff -- Ok no problem. Then make the variable non-final but don't expose it. So just have it `static int CONNECTION_RETRY_SLEEP = 1000`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134902558 @HuangWHWHW Thanks for your changes. Adding reflection calls to the testing codes is not good practice and makes the code hard to maintain. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37959366 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + + Field field = SocketTextStreamFunction.class.getDeclaredField(CONNECTION_RETRY_SLEEP); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField(modifiers); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() ~Modifier.FINAL); + field.set(null, 200); --- End diff -- That's quite a hack. I think it is ok to remove the `final` modifier and make the field variable package-local: `static int CONNECTION_RETRY_SLEEP = 1000`. Then you can set it directly here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-135262313 Hi,@mxm any new comment? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37879401 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -42,11 +42,13 @@ private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; - private static final int CONNECTION_RETRY_SLEEP = 1000; + public static int CONNECTION_RETRY_SLEEP = 1000; --- End diff -- This shouldn't be modifiable by everyone. Please make it just package-visible by removing the `public` modifier. Also, please keep the `final` modifier because the current implementation just lets the number of retries be configurable with a fixed 1 second retry rate. This is also documented in the user-facing API methods on DataStream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134797100 @mxm Hi,I chage the CONNECTION_RETRY_SLEEP to static final int CONNECTION_RETRY_SLEEP = 1000; But I have no idea to straightly changing the CONNECTION_RETRY_SLEEP in my test using: SocketTextStreamFunction.CONNECTION_RETRY_SLEEP = 200. So, I add a reflection mechanism to resolve this. And now the CONNECTION_RETRY_SLEEP changes to 200 in my test. Would you please to take a look whether it is correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37843364 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; +
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37845490 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; +
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134526309 Hi, I take a new change that decrease the sleep time and clear the error for each test cases. But I have no idea to control the value of retry to increase. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37844142 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; + channel =
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37856724 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37937505 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -42,11 +42,13 @@ private boolean retryForever; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; - private static final int CONNECTION_RETRY_SLEEP = 1000; + public static int CONNECTION_RETRY_SLEEP = 1000; --- End diff -- But if I add the final, it will be a error in my test: Cannot assign a value to final variable CONNECTION_RETRY_SLEEP. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134493928 @mxm It doesn`t matter. I`ll take a new change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37861790 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134570319 Hi, I decreased both the waiting time and the retry times since it will still cost over 10 seconds if only the waiting time is decreased due to the Thread.sleep(CONNECTION_RETRY_SLEEP);. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37860526 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37860912 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134567337 Looks good to merge if we further adjust the waiting time of the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37858063 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37859069 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37860800 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37864215 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + assertEquals(0, source.socketSource.retries); + } + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + error.set(null); + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0,
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37748868 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); --- End diff -- Did you have to add the sleep here to make the test pass? 10 seconds is quite long. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37749287 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; + channel =
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-134199343 @HuangWHWHW Sorry for keeping you waiting. I've made some more comments. Otherwise, I think this looks ready to be merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37749598 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; + channel =
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37748844 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; + channel =
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37748808 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; + channel =
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37762121 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); --- End diff -- You should clear the error for subsequent test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-132982117 Hi Max, I am very sorry to bothered you. I fixed some of my PRs and was waiting for your reply for days. Otherwise, as your advice, I added a change about the sink retry and take tests for it. I would be very honored if you can spend a little time to take a look about these PRs. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-133238272 @tillrohrmann Thank you for the info :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-132999197 @HuangWHWHW, Max is currently on vacations. He'll be back next week. I'm sure that he'll get back to you then :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130999638 Hi , I did a new change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955259 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -69,14 +68,14 @@ public void run(SourceContextString ctx) throws Exception { public void streamFromSocket(SourceContextString ctx, Socket socket) throws Exception { try { - StringBuffer buffer = new StringBuffer(); + StringBuilder buffer = new StringBuilder(); BufferedReader reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); while (isRunning) { - int data; + String data; try { - data = reader.read(); + data = reader.readLine(); --- End diff -- Please use `read()` because of the custom delimiter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955189 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -40,10 +37,12 @@ private char delimiter; private long maxRetry; private boolean retryForever; + private boolean isRetrying = false; private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + private volatile boolean isExit = false; --- End diff -- Is this flag necessary? We have `isRunning` already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955316 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -85,11 +84,12 @@ public void streamFromSocket(SourceContextString ctx, Socket socket) throws Ex } } - if (data == -1) { + if (data == null) { socket.close(); long retry = 0; boolean success = false; - while (retry maxRetry !success) { + while ((retry maxRetry || (retryForever !isExit)) !success) { + isRetrying = true; --- End diff -- This flag is only necessary for your test and thus should be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955334 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -145,4 +152,8 @@ public void cancel() { } } } + + public boolean getIsRetrying() { --- End diff -- Please remove this getter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36955221 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -69,14 +68,14 @@ public void run(SourceContextString ctx) throws Exception { public void streamFromSocket(SourceContextString ctx, Socket socket) throws Exception { --- End diff -- I think this method should be private because it is not meant to be used outside this class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130644950 `StringBuilder` is only for single-threaded while `StringBuffer` enables multi-thread access. If you use `StringBuffer` in a single-threaded scenario it has worse performance than `StringBuilder`. Thanks for you changes. In addition to the infinity test, can you add a test that checks for a certain number of retries (e.g. 10)? Also please add a check for 1 and 0 retries. It's always good to test corner cases :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130588652 Thanks for your changes. I think we should use `read()` instead of `readLine()` because we are using a custom delimiter and not necessarily \n (newline symbol). The danger of reading an entire line is that the newline symbol might never arrive. So it might continue to read forever. And even if it manages to find a newline symbol, you have to truncate your input to find the custom delimiter. That's not very efficient. Can you change the code back to using the `read()` method? I think we had a misunderstanding. For you test case: It's not considered good practice to mix production and test code. You're doing that by introducing the `isRetrying` flag and exposing it. Alternatively, you have two options: 1. Create a `ServerSocket` and pass its address to the `SocketTextStreamFunction`. Then control the connection to this socket and count how often the function reconnects (e.g. use the `accept()` method). 2. Create your test in the same package as the `SocketTextStreamFunction` function (package is `org.apache.flink.streaming.api.functions.source`). Then you can access all field variables which are protected. So make your `retries` variable a protected field variable of the `SocketTextStreamFunction` class. I hope that this helps you. If not, feel free to ask more questions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130524804 @mxm Hi, I fixed the StringBuffer and add the test. Take a look whether it`s correct. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130626843 Hi Max, I fixed all as your reviews. And I retained the change of StringBuffer to StringBuilder. There is a question that as I see the StringBuilder just do the same thing as StringBuffer currently. So what`s the real different the two type in the SocketTextStreamFunction? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130664351 @mxm Otherwise, I found the SocketClientSink didn`t have the retry. Is it necessary to get a retry? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973279 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + source.start(); + + Socket channel; + channel = serverSo.accept(); + channel.close(); +
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973254 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReferenceThrowable error = new AtomicReferenceThrowable(); + private final String host = 127.0.0.1; + + SourceFunction.SourceContextString ctx = new SourceFunction.SourceContextString() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(1); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail(Error in spawned thread: + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test +public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + source.start(); + + Socket channel; + channel = serverSo.accept(); + channel.close(); +
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973430 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- You should also initialize the variable with 0 in the `open()` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r36973377 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- Sorry, minor thing but it should be `retries`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37042816 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java --- @@ -43,6 +43,7 @@ private Socket socket; private static final int CONNECTION_TIMEOUT_TIME = 0; private static final int CONNECTION_RETRY_SLEEP = 1000; + protected long retrys; --- End diff -- Sorry for my English. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130663511 Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same as 10. And would you please take a look with another two tests(https://github.com/apache/flink/pull/991 and https://github.com/apache/flink/pull/977)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130672880 Otherwise, I found the SocketClientSink didn`t have the retry. Is it necessary to get a retry? Yes, that might be an issue but let's keep it separate from our concern here. If you want, you can open a JIRA issue for the missing retry option in the `SocketClientSink`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130217420 @mxm Hi, thank you for suggestions. I will try to follow your suggestions and improve the test. I understand almost of yours and I also read the Class documentation of BufferedReader.read(). When I was doing the test I found the BufferedReader.read() would never stop until it read next char from socket server or throw a Exception when socket is closed. Returning -1 in BufferedReader.read() seems to be only worked in text file instead socket message. And I looked for help in the net that some guys said you might add a method(Socket.setSoTimeout()) so the BufferedReader.read() would stop. But this way is not satisfied neither since it would throw a exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130213092 @HuangWHWHW `read()` method of the `BufferedReader` object returns `-1` in case the end of the stream has been reached. A couple of things I noticed apart from the `retryForever` issue. I wonder if we can fix these with this pull request as well: 1. The control flow of the `streamFromSocket` function is hard to predict because there are many `while` loops with `break`, `continue`, or `throw` statements. 2. We could use `StringBuilder` instead of `StringBuffer` in this class. `StringBuilder` is faster in the case of single-threaded access. 3. The function reads a single character at a time from the socket. It is more efficient to use a buffer and read several characters at once. @HuangWHWHW You asked how you could count the number of retries in a unit test. Typically, you would insert a `Mock` or a `Spy` into your test method. Unfortunately, this does not work here because the socket variables is overwritten in case of a retry. So for this test, I would recommend creating a local `ServerSocket` and let the function connect to this socket. You can then control the failures from your test socket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130222078 Actually point 3 is not so bad because we're using a buffered reader that fills the buffer and does not read a character from the socket on every call to `read()`. The `read()` method may throw an Exception or return -1. So we need to handle both of these cases. If closed properly, the socket should send the EOF event and the `read()` method returns -1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130206658 @mxm @StephanEwen Hi, I do a test for this today and I got another problem. The SocketTextStreamFunction use BufferedReader.read() to get the buffer which is sent by socket server. And whether this function BufferedReader.read() will never return -1 as the end of the sent message? If it was there should be another bug that code following will never be reachable: if (data == -1) { socket.close(); long retry = 0; boolean success = false; while ((retry maxRetry || retryForever) !success) { if (!retryForever) { retry++; } LOG.warn(Lost connection to server socket. Retrying in + (CONNECTION_RETRY_SLEEP / 1000) + seconds...); try { socket = new Socket(); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); success = true; } catch (ConnectException ce) { Thread.sleep(CONNECTION_RETRY_SLEEP); socket.close(); } } if (success) { LOG.info(Server socket is reconnected.); } else { LOG.error(Could not reconnect to server socket.); break; } reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); continue; } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130228505 Hi, there are two more questions: 1.In using StringBuilder, does it mean that we should use BufferedReader.readLine() instead of BufferedReader.read()? 2.Could you tell me how to make the BufferedReader.read() return -1? I tried many ways that all filed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130237642 1.In using StringBuilder, does it mean that we should use BufferedReader.readLine() instead of BufferedReader.read()? Reading by character is the way to go if we use a custom `delimiter`. If our delimiter was `\n` then it would be ok to read entire lines. Could you tell me how to make the BufferedReader.read() return -1? I tried many ways that all filed. Ok :) Here is a minimal working example where `read()` returns `-1`: ```java public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(12345); final SocketAddress socketAddress = socket.getLocalSocketAddress(); new Thread(new Runnable() { @Override public void run() { Socket socket = new Socket(); try { socket.connect(socketAddress); } catch (IOException e) { e.printStackTrace(); } try { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println((bufferedReader.read())); } catch (IOException e) { e.printStackTrace(); } } }).start(); Socket channel = socket.accept(); channel.close(); } ``` Output: ``` -1 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-130311909 Thank you! I`ll try again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-129864531 @HuangWHWHW `retryForever` is just a convenience variable for `maxRetry 0`. Your fix is correct because the loop will only execute if `maxRetry 0` and thus not execute at all if it should retry forever. It would be great if you added a test that checks for the correct number of retries. In case of infinite retries, just check up to a certain number (e.g. 100 retries). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-129875331 @mxm Ok, I`ll add a test. There is a little difficult that I can`t get the retry times in test since the retry is a local variable. So can I add a function to get the retry times? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-129571942 This fix looks valid. Can it be included in an extended test for the socket function? Something that validates that the function properly tries to reconnect? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-129667322 @StephanEwen Hi,yes,I plan to add a test for it. However, the test may be failed since the retryForever in the flink-master is also unworked currently. Will the test I`d add run together with this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-128309202 If you think it was necessary why was your first step to remove it's usage... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-128322151 Hah Sorry, this thought was generated after this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
GitHub user HuangWHWHW opened a pull request: https://github.com/apache/flink/pull/992 [FLINK-2490][FIX]Remove the retryForever check in function streamFrom⦠In the class SocketTextStreamFunction, the var retryForever only be set in the line this.retryForever = maxRetry 0;(SocketTextStreamFunction.java:54). When the program executes this âwhile (retry maxRetry !success) â it means the maxRetry 0 and the retryForever will always be false. So it`s unnecessary to judge whether retryForever be false. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HuangWHWHW/flink FLINK-2490 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/992.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #992 commit 25a41d0f462add2f11c6830ec8db518702f8dbb5 Author: HuangWHWHW 404823...@qq.com Date: 2015-08-06T08:55:24Z [FLINK-2490][FIX]Remove the retryForever check in function streamFromSocket --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-128307751 Yes, I understand you. But I think the retryForever is necessary. Maybe there is a bug that make the retryForever not working. I`ll get another fix after the CI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-128301542 if you remove that check, retryForever is unused and can be removed completely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---