Re: Streaming. Cannot get socketTextStream to receive anything.
Hi TD You are right, I did not include \n to delimit the string flushed. That's the reason. Is there a way for me to define the delimiter? Like SOH or ETX instead of \n Regards kytay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi Akhil Das Thanks. I tried the codes. and it works. There's a problem with my socket codes that is not flushing the content out, and for the test tool, Hercules, I have to close the socket connection to flush the content out. I am going to troubleshoot why nc works, and the codes and test tool don't. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi Tobias I have been using local[4] to test. My problem is likely caused by the tcp host server that I am trying the emulate. I was trying to emulate the tcp host to send out messages. (although I am not sure at the moment :D) First way I tried was to use a tcp tool called, Hercules. Second way was to write a simple socket code to send message at interval. Like the one shown in #2 of my first post. I suspect the reason why it don't work is due the messages are not flush so no message was received on Spark Streaming. I think I will need to do more testing to understand the behavior. I am currently not sure why nc -lk is working, and not the other tools or codes I am testing with. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi Akhil Das I have tried the nc -lk command too. I was hoping the System.out.println(Print text: + arg0); is printed when a stream is processed when lines.flatMap(...) is called. But from my test with nc -lk , nothing is printed on the console at all. == To test out whether the nc tool is working, I have also test the nc tool with the Hercules TCP client test tool, it works fine. So now the question goes back to why JavaDStreamString words =lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String arg0) throws Exception { System.out.println(Print text: + arg0); return Arrays.asList(arg0.split( )); } }); is not printing the text I am sending through nc -lk . === Is there any other way to test if socketTextStream(...) is working? Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9409.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
I think I should be seeing any line of text that I have typed in the nc command. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Getting Persistent Connection using socketStream?
Hi I am trying out a simple piece of code by writing my own JavaNetworkCount app to test out Spark Streaming So here is the 2 set of the codes. // #1 JavaReceiverInputDStreamString lines = sctx.socketTextStream(127.0.0.1, ); // #2 JavaReceiverInputDStreamString lines = sctx.socketStream( 127.0.0.1, , new FunctionInputStream, Iterablelt;String() { @Override public IterableString call(InputStream arg0) throws Exception { // TODO Auto-generated method stub if(arg0 != null) System.out.println(CALL is called...); BufferedReader reader = new BufferedReader(new InputStreamReader(arg0)); ArrayListString list = new ArrayListString(); while(reader.ready()) { String linetext = reader.readLine(); System.out.println(linetext); list.add(linetext); } if(list.size() 0) System.out.println(ArrayList is not empty.); return list; } }, StorageLevel.MEMORY_AND_DISK_SER_2() ); I am writing the #2 to test out some other issues that I am facing, where the text stream from the TCP host is not received, but this is not my first concern. What I am concern about is. Using .socketTextStream(), the code manage to keep a persistent connection to the TCP host, while for #2 code using .socketStream(), I am unable to maintain a persistent connection. The following is the log printed when I run #2 14/07/10 01:55:42 INFO ReceiverSupervisorImpl: Receiver started again 14/07/10 01:55:43 INFO SocketReceiver: Connected to 127.0.0.1: CALL is called... 14/07/10 01:55:43 INFO SocketReceiver: Stopped receiving 14/07/10 01:55:43 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to 127.0.0.1: 14/07/10 01:55:43 INFO SocketReceiver: Closed socket to 127.0.0.1: 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to 127.0.0.1:: 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Called receiver onStop 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Deregistering receiver 0 14/07/10 01:55:43 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to 127.0.0.1: 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopped receiver 0 14/07/10 01:55:45 INFO ReceiverTracker: Stream 0 received 0 blocks 14/07/10 01:55:45 INFO JobScheduler: Added jobs for time 1404984705000 ms 14/07/10 01:55:45 INFO ReceiverSupervisorImpl: Starting receiver again iframe src=http://pastebin.com/embed_iframe.php?i=KVWEC1kU; style=border:none;width:100%/iframe I am very new to clustered computing, hadoop, spark, even streaming. So I may not get the entire concept right. So may I clarify, is there something in my #2 codes? am i able to achieve the same thing as what #1 is trying to do? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Getting Persistent Connection using socketStream?
Hi TD Thanks. I have problem understanding the codes in github, Object SocketReceiver.byteToLines(...) https://github.com/apache/spark/blob/095b5182536a43e2ae738be93294ee5215d86581/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala private[streaming] object SocketReceiver { /** * This methods translates the data from an inputstream (say, from a socket) * to '\n' delimited strings and returns an iterator to access the strings. */ def bytesToLines(inputStream: InputStream): Iterator[String] = { val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, UTF-8)) new NextIterator[String] { protected override def getNext() = { val nextValue = dataInputStream.readLine() if (nextValue == null) { finished = true } nextValue } protected override def close() { dataInputStream.close() } } } Sorry will need some time to digest this. I do not know scala at the moment. But I understand what you mean about the implementation. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285p9380.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Streaming. Cannot get socketTextStream to receive anything.
Hi I am learning spark streaming, and is trying out the JavaNetworkCount example. #1 - This is the code I wrote JavaStreamingContext sctx = new JavaStreamingContext(local, appName, new Duration(5000)); JavaReceiverInputDStreamString lines = sctx.socketTextStream(127.0.0.1, ); JavaDStreamString words =lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String arg0) throws Exception { System.out.println(Print text: + arg0); return Arrays.asList(arg0.split( )); } }); #2 - This is the socketCode I am using import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; public class TestTcpServer { public static void main(String argv[]) throws Exception { String clientSentence; String capitalizedSentence; ServerSocket welcomeSocket = new ServerSocket(); int i = 0; while(true) { Socket connectionSocket = welcomeSocket.accept(); BufferedReader inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()) ); DataOutputStream outToClient = new DataOutputStream(connectionSocket.getOutputStream()); while(true) { String sendingStr = Sending... data... + i; outToClient.writeBytes(sendingStr); System.out.println(sendingStr); i++; Thread.sleep(3000); } } } } What I am trying to do is to get the JavaNetworkCount in #1 to start printing all the text I am receiving. But so far I failed to achieve that. I have been using Hercules Setup http://www.hw-group.com/products/hercules/details_en.html to simulate as a TCP server, as well as a simple serversocket code in #2... But I am not seeing any text being printed on the console. Is public IterableString call(String arg0) throws Exception being called every 5 secs? The console log is in http://pastebin.com/THzdzGhg http://pastebin.com/THzdzGhg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html Sent from the Apache Spark User List mailing list archive at Nabble.com.