Fabian, Thanks a lot for your continuous help! Really appreciate it. Sent from Phone.
> On May 8, 2018, at 03:06, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Dhruv, > > The changes look good to me. > > Best, Fabian > > 2018-05-08 5:37 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>: >> Thanks a lot, Fabian for your response. >> >> What I understand is that if I write my own Sourcefunction such that it >> handles the "end of stream” record and make the source exit from run() >> method, the flink program will terminate. >> >> I have been using SocketTextStreamFunction till now. >> So, I duplicated the SocketTextStreamFunction class into another class named >> CustomSocketTextStreamFunction which is exactly the same as >> SocketTextStreamFunction except for one change in the run() method. Change >> is highlighted in BOLD below. Can you take a look and let me know if this >> will work and it won’t have much of performance impact? I tested it on my >> machine locally and seems to work fine. But I just want to make sure that it >> won’t have any side effects/race conditions etc. >> >> ``` >> @Override >> public void run(SourceContext<String> ctx) throws Exception { >> final StringBuilder buffer = new StringBuilder(); >> long attempt = 0; >> >> while (isRunning) { >> >> try (Socket socket = new Socket()) { >> currentSocket = socket; >> >> LOG.info("Custom: Connecting to server socket " + hostname + >> ':' + port); >> socket.connect(new InetSocketAddress(hostname, port), >> CONNECTION_TIMEOUT_TIME); >> BufferedReader reader = new BufferedReader(new >> InputStreamReader(socket.getInputStream())); >> >> char[] cbuf = new char[8192]; >> int bytesRead; >> while (isRunning && (bytesRead = reader.read(cbuf)) != -1) { >> buffer.append(cbuf, 0, bytesRead); >> int delimPos; >> while (buffer.length() >= delimiter.length() && >> (delimPos = buffer.indexOf(delimiter)) != -1) { >> String record = buffer.substring(0, delimPos); >> if(record.equals("END")) { >> LOG.info("End of stream encountered"); >> isRunning = false; >> buffer.delete(0, delimPos + delimiter.length()); >> break; >> } >> // truncate trailing carriage return >> if (delimiter.equals("\n") && record.endsWith("\r")) >> { >> record = record.substring(0, record.length() - >> 1); >> } >> ctx.collect(record); >> buffer.delete(0, delimPos + delimiter.length()); >> } >> } >> } >> >> // if we dropped out of this loop due to an EOF, sleep and retry >> if (isRunning) { >> attempt++; >> if (maxNumRetries == -1 || attempt < maxNumRetries) { >> LOG.warn("Lost connection to server socket. Retrying in >> " + delayBetweenRetries + " msecs..."); >> Thread.sleep(delayBetweenRetries); >> } >> else { >> // this should probably be here, but some examples >> expect simple exists of the stream source >> // throw new EOFException("Reached end of stream and >> reconnects are not enabled."); >> break; >> } >> } >> } >> >> // collect trailing data >> if (buffer.length() > 0) { >> ctx.collect(buffer.toString()); >> } >> } >> ``` >> >> >> -------------------------------------------------- >> Dhruv Kumar >> PhD Candidate >> Department of Computer Science and Engineering >> University of Minnesota >> www.dhruvkumar.me >> >>> On May 7, 2018, at 11:04, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Hi, >>> >>> Flink will automatically stop the execution of a DataStream program once >>> all sources have finished to provide data, i.e., when all SourceFunction >>> return from the run() method. >>> The DeserializationSchema.isEndOfStream() method can be used to tell a >>> built-in SourceFunction such as a KafkaConsumer that it should leave the >>> run() method. >>> If you implement your own SourceFunction you can leave run() after you >>> ingested all data. >>> >>> Note, that Flink won't wait for all processing time timers but will >>> immediately shutdown the program after the last in-flight record was >>> processed. >>> Event-time timers will be handled because each source emits a >>> Long.MAX_VALUE watermark after it emitted its last record. >>> >>> Best, Fabian >>> >>> 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>: >>>> I notice that there is some DeserializationSchema in >>>> org.apache.flink.api.common.serialization which has a function >>>> isEndOfStream but I am not sure if I can use it in my use case. >>>> >>>> -------------------------------------------------- >>>> Dhruv Kumar >>>> PhD Candidate >>>> Department of Computer Science and Engineering >>>> University of Minnesota >>>> www.dhruvkumar.me >>>> >>>>> On May 7, 2018, at 06:18, Dhruv Kumar <gargdhru...@gmail.com> wrote: >>>>> >>>>> Hi >>>>> >>>>> Is there a way I can capture the end of stream signal for streams which >>>>> are replayed from historical data? I need the end of stream signal to >>>>> tell the Flink program to finish its execution. >>>>> >>>>> Below is the use case in detail: >>>>> 1. An independent log replayer program sends the records to a socket >>>>> (identified by ip address and port). >>>>> 2. Flink program reads the incoming records via socketTextStream from the >>>>> above mentioned socket, applies a KeyBy operator on the incoming records >>>>> and then does some processing, finally writing them to another socket. >>>>> >>>>> How do I tell the Flink program to finish its execution? Is there any >>>>> information which I can add to the records while they are sent from the >>>>> replayer program and which can be parsed when the records arrive inside >>>>> the Flink program? >>>>> >>>>> Let me know if anything is not clear. >>>>> >>>>> Thanks >>>>> >>>>> -------------------------------------------------- >>>>> Dhruv Kumar >>>>> PhD Candidate >>>>> Department of Computer Science and Engineering >>>>> University of Minnesota >>>>> www.dhruvkumar.me >>>>> >>>> >>> >> >