Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-23 Thread kytay
Hi TD

You are right, I did not include \n to delimit the string flushed. That's
the reason.

Is there a way for me to define the delimiter? Like SOH or ETX instead of
\n

Regards
kytay



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-13 Thread kytay
Hi Akhil Das

Thanks.

I tried the codes. and it works.

There's a problem with my socket codes that is not flushing the content out,
and for the test tool, Hercules, I have to close the socket connection to
flush the content out.

I am going to troubleshoot why nc works, and the codes and test tool don't.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-13 Thread kytay
Hi Tobias

I have been using local[4] to test.
My problem is likely caused by the tcp host server that I am trying the
emulate. I was trying to emulate the tcp host to send out messages.
(although I am not sure at the moment :D)

First way I tried was to use a tcp tool called, Hercules.

Second way was to write a simple socket code to send message at interval.
Like the one shown in #2 of my first post. I suspect the reason why it don't
work is due the messages are not flush so no message was received on Spark
Streaming.

I think I will need to do more testing to understand the behavior. I am
currently not sure why nc -lk is working, and not the other tools or codes
I am testing with.

Regards.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread kytay
Hi Akhil Das

I have tried the 
nc -lk  
command too.

I was hoping the System.out.println(Print text: + arg0); is printed when
a stream is processed when lines.flatMap(...) is called.

But from my test with nc -lk , nothing is printed on the console at
all.

==

To test out whether the nc tool is working, I have also test the nc tool
with the Hercules TCP client test tool, it works fine.

So now the question goes back to why

JavaDStreamString words =lines.flatMap(
new FlatMapFunctionString, String() {
@Override
public IterableString call(String arg0) throws 
Exception {

System.out.println(Print text: + arg0);
return Arrays.asList(arg0.split( ));
}
});

is not printing the text I am sending through nc -lk .

===

Is there any other way to test if socketTextStream(...) is working?

Regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9409.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread kytay
I think I should be seeing any line of text that I have typed in the nc
command.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Getting Persistent Connection using socketStream?

2014-07-10 Thread kytay
Hi

I am trying out a simple piece of code by writing my own JavaNetworkCount
app to test out Spark Streaming

So here is the 2 set of the codes.


// #1
JavaReceiverInputDStreamString lines = sctx.socketTextStream(127.0.0.1,
);


// #2
JavaReceiverInputDStreamString lines = sctx.socketStream(
127.0.0.1,
,
new FunctionInputStream, Iterablelt;String() {

@Override
public IterableString 
call(InputStream arg0) throws Exception {
// TODO Auto-generated method 
stub
if(arg0 != null)

System.out.println(CALL is called...);

BufferedReader reader = new 
BufferedReader(new
InputStreamReader(arg0));
ArrayListString list = new 
ArrayListString();
while(reader.ready())
{
String linetext = 
reader.readLine();

System.out.println(linetext);
list.add(linetext);
}

if(list.size()  0)

System.out.println(ArrayList is not empty.);

return list;
}

},
StorageLevel.MEMORY_AND_DISK_SER_2()
);

I am writing the #2 to test out some other issues that I am facing, where
the text stream from the TCP host is not received, but this is not my first
concern.

What I am concern about is. 

Using .socketTextStream(), the code manage to keep a persistent connection
to the TCP host, while for #2 code using .socketStream(), I am unable to
maintain a persistent connection.

The following is the log printed when I run #2

14/07/10 01:55:42 INFO ReceiverSupervisorImpl: Receiver started again
14/07/10 01:55:43 INFO SocketReceiver: Connected to 127.0.0.1:
CALL is called...
14/07/10 01:55:43 INFO SocketReceiver: Stopped receiving
14/07/10 01:55:43 WARN ReceiverSupervisorImpl: Restarting receiver with
delay 2000 ms: Retrying connecting to 127.0.0.1:
14/07/10 01:55:43 INFO SocketReceiver: Closed socket to 127.0.0.1:
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopping receiver with
message: Restarting receiver with delay 2000ms: Retrying connecting to
127.0.0.1:: 
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Called receiver onStop
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Deregistering receiver 0
14/07/10 01:55:43 ERROR ReceiverTracker: Deregistered receiver for stream 0:
Restarting receiver with delay 2000ms: Retrying connecting to 127.0.0.1:
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopped receiver 0
14/07/10 01:55:45 INFO ReceiverTracker: Stream 0 received 0 blocks
14/07/10 01:55:45 INFO JobScheduler: Added jobs for time 1404984705000 ms
14/07/10 01:55:45 INFO ReceiverSupervisorImpl: Starting receiver again

iframe src=http://pastebin.com/embed_iframe.php?i=KVWEC1kU;
style=border:none;width:100%/iframe

I am very new to clustered computing, hadoop, spark, even streaming. So I
may not get the entire concept right.

So may I clarify, is there something in my #2 codes? am i able to achieve
the same thing as what #1 is trying to do?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Getting Persistent Connection using socketStream?

2014-07-10 Thread kytay
Hi TD

Thanks. 

I have problem understanding the codes in github,  Object
SocketReceiver.byteToLines(...)
https://github.com/apache/spark/blob/095b5182536a43e2ae738be93294ee5215d86581/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
  

private[streaming]
object SocketReceiver  {

  /**
   * This methods translates the data from an inputstream (say, from a
socket)
   * to '\n' delimited strings and returns an iterator to access the
strings.
   */
  def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(new
InputStreamReader(inputStream, UTF-8))
new NextIterator[String] {
  protected override def getNext() = {
val nextValue = dataInputStream.readLine()
if (nextValue == null) {
  finished = true
}
nextValue
  }

  protected override def close() {
dataInputStream.close()
  }
}
  }

Sorry will need some time to digest this. I do not know scala at the moment.
But I understand what you mean about the implementation. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285p9380.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Streaming. Cannot get socketTextStream to receive anything.

2014-07-10 Thread kytay
Hi

I am learning spark streaming, and is trying out the JavaNetworkCount
example.


#1 - This is the code I wrote
JavaStreamingContext sctx = new JavaStreamingContext(local, appName, new
Duration(5000));
JavaReceiverInputDStreamString lines = sctx.socketTextStream(127.0.0.1,
);

JavaDStreamString words =lines.flatMap(
new FlatMapFunctionString, String() {
@Override
public IterableString call(String arg0) throws 
Exception {

System.out.println(Print text: + arg0);
return Arrays.asList(arg0.split( ));
}
});

#2  - This is the socketCode I am using
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class TestTcpServer {

public static void main(String argv[]) throws Exception   
{ 
String clientSentence;  
String capitalizedSentence;  
ServerSocket welcomeSocket = new ServerSocket();   

int i = 0;

while(true)  
{ 
Socket connectionSocket = welcomeSocket.accept();   
  
BufferedReader inFromClient = new BufferedReader(
new 
InputStreamReader(connectionSocket.getInputStream())
); 
DataOutputStream outToClient = new
DataOutputStream(connectionSocket.getOutputStream());

while(true)
{
String sendingStr = Sending... data...  + i;
outToClient.writeBytes(sendingStr);
System.out.println(sendingStr); 
i++;
Thread.sleep(3000);
} 
}   
}
}

What I am trying to do is to get the JavaNetworkCount in #1 to start
printing all the text I am receiving. But so far I failed to achieve that.

I have been using  Hercules Setup
http://www.hw-group.com/products/hercules/details_en.html   to simulate as
a TCP server, as well as a simple serversocket code in #2...
But I am not seeing any text being printed on the console.

Is public IterableString call(String arg0) throws Exception being called
every 5 secs?

The console log is in  http://pastebin.com/THzdzGhg
http://pastebin.com/THzdzGhg  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.