[ https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710895#comment-14710895 ]
ASF GitHub Bot commented on FLINK-2490: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/992#discussion_r37844142 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import static java.lang.Thread.sleep; +import static org.junit.Assert.*; + +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}. + */ +public class SocketTextStreamFunctionTest{ + + final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); + private final String host = "127.0.0.1"; + + SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() { + public String result; + + @Override + public void collect(String element) { + result = element; + } + + @Override + public String toString() { + return this.result; + } + + @Override + public void collectWithTimestamp(String element, long timestamp) { + + } + + @Override + public void emitWatermark(Watermark mark) { + + } + + @Override + public Object getCheckpointLock() { + return null; + } + + @Override + public void close() { + + } + }; + + public SocketTextStreamFunctionTest() { + } + + class SocketSource extends Thread { + + SocketTextStreamFunction socketSource; + + public SocketSource(ServerSocket serverSo, int maxRetry) { + this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry); + } + + public void run() { + try { + this.socketSource.open(new Configuration()); + this.socketSource.run(ctx); + }catch(Exception e){ + error.set(e); + } + } + + public void cancel(){ + this.socketSource.cancel(); + } + } + + @Test + public void testSocketSourceRetryForever() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, -1); + source.start(); + + int count = 0; + Socket channel; + while (count < 100) { + channel = serverSo.accept(); + count++; + channel.close(); + } + sleep(10000); + source.cancel(); + + if (error.get() != null) { + Throwable t = error.get(); + t.printStackTrace(); + fail("Error in spawned thread: " + t.getMessage()); + } + + assertEquals(100, count); + } + + @Test + public void testSocketSourceRetryTenTimes() throws Exception{ + ServerSocket serverSo = new ServerSocket(0); + SocketSource source = new SocketSource(serverSo, 10); + + assertEquals(0, source.socketSource.retries); + + source.start(); + + Socket channel; + channel = serverSo.accept(); + channel.close(); + serverSo.close(); + while(source.socketSource.retries < 10){ + long lastRetry = source.socketSource.retries; + sleep(10000); + assertTrue(source.socketSource.retries > lastRetry); --- End diff -- Yes, exactly. Please see my previous edit. > Remove unwanted boolean check in function > SocketTextStreamFunction.streamFromSocket > ----------------------------------------------------------------------------------- > > Key: FLINK-2490 > URL: https://issues.apache.org/jira/browse/FLINK-2490 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 0.10 > Reporter: Huang Wei > Priority: Minor > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > -- This message was sent by Atlassian JIRA (v6.3.4#6332)