Fabian, Thanks a lot for your continuous help! Really appreciate it.

Sent from Phone.

> On May 8, 2018, at 03:06, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Dhruv,
> 
> The changes look good to me.
> 
> Best, Fabian
> 
> 2018-05-08 5:37 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>:
>> Thanks a lot, Fabian for your response.
>> 
>> What I understand is that if I write my own Sourcefunction such that it 
>> handles the "end of stream” record and make the source exit from run() 
>> method, the flink program will terminate. 
>> 
>> I have been using SocketTextStreamFunction till now.
>> So, I duplicated the SocketTextStreamFunction class into another class named 
>> CustomSocketTextStreamFunction which is exactly the same as 
>> SocketTextStreamFunction except for one change in the run() method. Change 
>> is highlighted in BOLD below. Can you take a look and let me know if this 
>> will work and it won’t have much of performance impact? I tested it on my 
>> machine locally and seems to work fine. But I just want to make sure that it 
>> won’t have any side effects/race conditions etc.
>> 
>> ```
>> @Override
>>     public void run(SourceContext<String> ctx) throws Exception {
>>         final StringBuilder buffer = new StringBuilder();
>>         long attempt = 0;
>> 
>>         while (isRunning) {
>> 
>>             try (Socket socket = new Socket()) {
>>                 currentSocket = socket;
>> 
>>                 LOG.info("Custom: Connecting to server socket " + hostname + 
>> ':' + port);
>>                 socket.connect(new InetSocketAddress(hostname, port), 
>> CONNECTION_TIMEOUT_TIME);
>>                 BufferedReader reader = new BufferedReader(new 
>> InputStreamReader(socket.getInputStream()));
>> 
>>                 char[] cbuf = new char[8192];
>>                 int bytesRead;
>>                 while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
>>                     buffer.append(cbuf, 0, bytesRead);
>>                     int delimPos;
>>                     while (buffer.length() >= delimiter.length() && 
>> (delimPos = buffer.indexOf(delimiter)) != -1) {
>>                         String record = buffer.substring(0, delimPos);
>>                         if(record.equals("END")) {
>>                             LOG.info("End of stream encountered");
>>                             isRunning = false;
>>                             buffer.delete(0, delimPos + delimiter.length());
>>                             break;
>>                         }
>>                         // truncate trailing carriage return
>>                         if (delimiter.equals("\n") && record.endsWith("\r")) 
>> {
>>                             record = record.substring(0, record.length() - 
>> 1);
>>                         }
>>                         ctx.collect(record);
>>                         buffer.delete(0, delimPos + delimiter.length());
>>                     }
>>                 }
>>             }
>> 
>>             // if we dropped out of this loop due to an EOF, sleep and retry
>>             if (isRunning) {
>>                 attempt++;
>>                 if (maxNumRetries == -1 || attempt < maxNumRetries) {
>>                     LOG.warn("Lost connection to server socket. Retrying in 
>> " + delayBetweenRetries + " msecs...");
>>                     Thread.sleep(delayBetweenRetries);
>>                 }
>>                 else {
>>                     // this should probably be here, but some examples 
>> expect simple exists of the stream source
>>                     // throw new EOFException("Reached end of stream and 
>> reconnects are not enabled.");
>>                     break;
>>                 }
>>             }
>>         }
>> 
>>         // collect trailing data
>>         if (buffer.length() > 0) {
>>             ctx.collect(buffer.toString());
>>         }
>>     }
>> ```
>> 
>> 
>> --------------------------------------------------
>> Dhruv Kumar
>> PhD Candidate
>> Department of Computer Science and Engineering
>> University of Minnesota
>> www.dhruvkumar.me
>> 
>>> On May 7, 2018, at 11:04, Fabian Hueske <fhue...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> Flink will automatically stop the execution of a DataStream program once 
>>> all sources have finished to provide data, i.e., when all SourceFunction 
>>> return from the run() method.
>>> The DeserializationSchema.isEndOfStream() method can be used to tell a 
>>> built-in SourceFunction such as a KafkaConsumer that it should leave the 
>>> run() method.
>>> If you implement your own SourceFunction you can leave run() after you 
>>> ingested all data.
>>> 
>>> Note, that Flink won't wait for all processing time timers but will 
>>> immediately shutdown the program after the last in-flight record was 
>>> processed. 
>>> Event-time timers will be handled because each source emits a 
>>> Long.MAX_VALUE watermark after it emitted its last record.
>>> 
>>> Best, Fabian
>>> 
>>> 2018-05-07 17:18 GMT+02:00 Dhruv Kumar <gargdhru...@gmail.com>:
>>>> I notice that there is some DeserializationSchema in 
>>>> org.apache.flink.api.common.serialization which has a function 
>>>> isEndOfStream but I am not sure if I can use it in my use case. 
>>>> 
>>>> --------------------------------------------------
>>>> Dhruv Kumar
>>>> PhD Candidate
>>>> Department of Computer Science and Engineering
>>>> University of Minnesota
>>>> www.dhruvkumar.me
>>>> 
>>>>> On May 7, 2018, at 06:18, Dhruv Kumar <gargdhru...@gmail.com> wrote:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> Is there a way I can capture the end of stream signal for streams which 
>>>>> are replayed from historical data? I need the end of stream signal to 
>>>>> tell the Flink program to finish its execution.
>>>>> 
>>>>> Below is the use case in detail:
>>>>> 1. An independent log replayer program sends the records to a socket 
>>>>> (identified by ip address and port).
>>>>> 2. Flink program reads the incoming records via socketTextStream from the 
>>>>> above mentioned socket, applies a KeyBy operator on the incoming records 
>>>>> and then does some processing, finally writing them to another socket.
>>>>> 
>>>>> How do I tell the Flink program to finish its execution? Is there any 
>>>>> information which I can add to the records while they are sent from the 
>>>>> replayer program and which can be parsed when the records arrive inside 
>>>>> the Flink program?
>>>>> 
>>>>> Let me know if anything is not clear.
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> --------------------------------------------------
>>>>> Dhruv Kumar
>>>>> PhD Candidate
>>>>> Department of Computer Science and Engineering
>>>>> University of Minnesota
>>>>> www.dhruvkumar.me
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to