Re: Race Condition in Streaming Thread

2015-02-27 Thread Tathagata Das
Are you sure the multiple invocations are not from previous runs of the
program?

TD

On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) 
nave...@cisco.com wrote:

  Hi

 Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd
 party udp traffic generator, from the streaming thread. The excerpt is as
 follows:

 …

do{



  try {




  p = Runtime.getRuntime().exec(Prog );



   socket.receive(packet);



   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …

 Program has a test to check for existing instantiation, e.g. [ $(pidof
 Prog) ]  exit.  This code runs fine, i.e., 3rd party application is
 invoked, data is received, analyzed on driver, etc. Problem arises, when I
 test redundancy and fault-tolerance. Specifically, when I manually
 terminate Prog, upon recovery,  multiple invocations are observed. This
 could be due to multiple threads getting through  [ $(pidof Prog) ] 
 exit. However, I was hoping by adding semaphores, as follows, to avoid this
 problem:

 …

do{



  try {


 sem.acquire();

   p =
 Runtime.getRuntime().exec(Prog);

  sem.release();

 }catch(IOException ioe){


   //ioe.printStackTrace();

   break;

 }

   socket.receive(packet);

   //InetAddress returnIPAddress = packet.getAddress();
 returnPort = packet.getPort();

   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …



 However, I am still seeing multiple invocations of Prog, upon recovery. I
 have also experimented with the following configuration parameters, to no
 avail:

   sparkConf.set(spark.cores.max, args[1]);

   sparkConf.set(spark.task.cpus, args[2]);

  sparkConf.set(spark.default.parallelism,args[2]);

 with args={(1,1),(2,1), (1,2),…}

 Any thoughts?

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*





Re: Race Condition in Streaming Thread

2015-02-27 Thread Tathagata Das
Its wasn't clear from the snippet whats going on. Can your provide the
whole Receiver code?

TD

On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) 
nave...@cisco.com wrote:

  I am, as I issue killall -9 Prog, prior to testing.

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327 – Privacy
 http://www.cisco.com/web/siteassets/legal/privacy.html*



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Friday, February 27, 2015 12:29 PM
 *To:* Nastooh Avessta (navesta)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Race Condition in Streaming Thread



 Are you sure the multiple invocations are not from previous runs of the
 program?



 TD



 On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) 
 nave...@cisco.com wrote:

 Hi

 Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd
 party udp traffic generator, from the streaming thread. The excerpt is as
 follows:

 …

do{



  try {




  p = Runtime.getRuntime().exec(Prog );



   socket.receive(packet);



   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …

 Program has a test to check for existing instantiation, e.g. [ $(pidof
 Prog) ]  exit.  This code runs fine, i.e., 3rd party application is
 invoked, data is received, analyzed on driver, etc. Problem arises, when I
 test redundancy and fault-tolerance. Specifically, when I manually
 terminate Prog, upon recovery,  multiple invocations are observed. This
 could be due to multiple threads getting through  [ $(pidof Prog) ] 
 exit. However, I was hoping by adding semaphores, as follows, to avoid this
 problem:

 …

do{



  try {


 sem.acquire();

   p =
 Runtime.getRuntime().exec(Prog);

  sem.release();

 }catch(IOException ioe){


   //ioe.printStackTrace();

   break;

 }

   socket.receive(packet);

   //InetAddress returnIPAddress = packet.getAddress();
 returnPort = packet.getPort();

   output.clear();

   kryo.writeObject(output, packet);

   store(output);

 …



 However, I am still seeing multiple invocations of Prog, upon recovery. I
 have also experimented with the following configuration parameters, to no
 avail:

   sparkConf.set(spark.cores.max, args[1]);

   sparkConf.set(spark.task.cpus, args[2]);

  sparkConf.set(spark.default.parallelism,args[2]);

 with args={(1,1),(2,1), (1,2),…}

 Any thoughts?

 Cheers,



 [image: http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

 *Nastooh Avessta*
 ENGINEER.SOFTWARE ENGINEERING
 nave...@cisco.com
 Phone: *+1 604 647 1527 %2B1%20604%20647%201527*

 *Cisco Systems Limited*
 595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
 VANCOUVER
 BRITISH COLUMBIA
 V7X 1J1
 CA
 Cisco.com http://www.cisco.com/



 [image: Think before you print.]Think before you print.

 This email may contain confidential and privileged material for the sole
 use of the intended recipient. Any review, use, distribution or disclosure
 by others is strictly prohibited. If you are not the intended recipient (or
 authorized to receive for the recipient), please contact the sender by
 reply email and delete all copies of this message.

 For corporate legal information go to:
 http://www.cisco.com/web/about/doing_business/legal/cri/index.html

 Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J
 2T3. Phone: 416-306-7000; Fax: 416-306-7099. *Preferences
 http://www.cisco.com/offer/subscribe/?sid=000478326 - Unsubscribe
 http://www.cisco.com/offer/unsubscribe/?sid=000478327

RE: Race Condition in Streaming Thread

2015-02-27 Thread Nastooh Avessta (navesta)
Thank you for your time and effort. Here is the code:
---

public  final class Multinode extends ReceiverOutput {

  String host = null;
  int portRx = -1;
  int portTx = -1;
  private final Semaphore sem = new Semaphore(1);

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName(Multinode);
sparkConf.set(spark.cores.max, args[1]);
sparkConf.set(spark.task.cpus, args[2]);
sparkConf.set(spark.default.parallelism,args[2]);
sparkConf.set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer);

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(Integer.parseInt(args[6])));

JavaReceiverInputDStreamOutput data = ssc.receiverStream(
new Multinode(args[3], 
Integer.parseInt(args[4]),Integer.parseInt(args[5])));

JavaDStreamInteger udp = data.map(new FunctionOutput, Integer() {
  @Override
  public Integer call(Output x) {

Integer Ret=new Integer(x.position());
return (Ret);
  }
});

udp.print();
ssc.start();
ssc.awaitTermination();
  }


  public Multinode(String host_ , int portRx_,int portTx_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
portRx = portRx_;
portTx = portTx_;

  }

  public void onStart() {
// Start the thread that receives data over a connection
new Thread()  {
  @Override public void run() {

receive();

  }
}.start();
  }

  public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
  Process p;
DatagramSocket socket = null;
byte[] buf = new byte[1400];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
Kryo kryo = new Kryo();
kryo.register(DatagramPacket.class);
Output output=new Output(2900); //2*(1400+35)


try {

  // connect to the server
  if(socket==null) socket = new  DatagramSocket(portRx);
  InetAddress returnIPAddress = InetAddress.getByName(host) ;
  int returnPort =portTx;

 do{

 try {
sem.acquire();

  p = 
Runtime.getRuntime().exec(Prog);
 sem.release();
}catch(IOException ioe){
  //ioe.printStackTrace();
  break;
}
  socket.receive(packet);

  output.clear();
  kryo.writeObject(output, packet);
  store(output);
  packet.setAddress(returnIPAddress);
  packet.setPort(returnPort);
  socket.send(packet);
  } while (!isStopped());
  socket.close(); socket = null;

  // Restart in an attempt to connect again when server is active again
  restart(Trying to connect again);
} catch(ConnectException ce) {
  // restart if could not connect to server
  restart(Could not connect, ce);
} catch(Throwable t) {
  restart(Error receiving data, t);
}
  }

}

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.comhttp://www.cisco.com/





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - 
Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 – 
Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html

From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, February 27, 2015 12:39 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org
Subject: Re: Race Condition in Streaming Thread

Its wasn't clear from the snippet whats going on. Can your provide the whole 
Receiver code?

TD

On Fri, Feb 27, 2015 at 12:37 PM, Nastooh Avessta (navesta) 
nave

RE: Race Condition in Streaming Thread

2015-02-27 Thread Nastooh Avessta (navesta)
I am, as I issue killall -9 Prog, prior to testing.
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.com
Phone: +1 604 647 1527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.comhttp://www.cisco.com/





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000; Fax: 416-306-7099. 
Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - 
Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 – 
Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html

From: Tathagata Das [mailto:t...@databricks.com]
Sent: Friday, February 27, 2015 12:29 PM
To: Nastooh Avessta (navesta)
Cc: user@spark.apache.org
Subject: Re: Race Condition in Streaming Thread

Are you sure the multiple invocations are not from previous runs of the program?

TD

On Fri, Feb 27, 2015 at 12:16 PM, Nastooh Avessta (navesta) 
nave...@cisco.commailto:nave...@cisco.com wrote:
Hi
Under Spark 1.0.0, standalone, client mode am trying to invoke a 3rd party udp 
traffic generator, from the streaming thread. The excerpt is as follows:
…
   do{

 try {

 p = Runtime.getRuntime().exec(Prog );

  socket.receive(packet);

  output.clear();
  kryo.writeObject(output, packet);
  store(output);
…
Program has a test to check for existing instantiation, e.g. [ $(pidof Prog) 
]  exit.  This code runs fine, i.e., 3rd party application is invoked, data 
is received, analyzed on driver, etc. Problem arises, when I test redundancy 
and fault-tolerance. Specifically, when I manually terminate Prog, upon 
recovery,  multiple invocations are observed. This could be due to multiple 
threads getting through  [ $(pidof Prog) ]  exit. However, I was hoping by 
adding semaphores, as follows, to avoid this problem:
…
   do{

 try {
sem.acquire();
  p = 
Runtime.getRuntime().exec(Prog);
 sem.release();
}catch(IOException ioe){
  //ioe.printStackTrace();
  break;
}
  socket.receive(packet);
  //InetAddress returnIPAddress = packet.getAddress(); 
returnPort = packet.getPort();
  output.clear();
  kryo.writeObject(output, packet);
  store(output);
…

However, I am still seeing multiple invocations of Prog, upon recovery. I have 
also experimented with the following configuration parameters, to no avail:
  sparkConf.set(spark.cores.max, args[1]);
  sparkConf.set(spark.task.cpus, args[2]);
 sparkConf.set(spark.default.parallelism,args[2]);
with args={(1,1),(2,1), (1,2),…}
Any thoughts?
Cheers,

[http://www.cisco.com/web/europe/images/email/signature/logo05.jpg]

Nastooh Avessta
ENGINEER.SOFTWARE ENGINEERING
nave...@cisco.commailto:nave...@cisco.com
Phone: +1 604 647 1527tel:%2B1%20604%20647%201527

Cisco Systems Limited
595 Burrard Street, Suite 2123 Three Bentall Centre, PO Box 49121
VANCOUVER
BRITISH COLUMBIA
V7X 1J1
CA
Cisco.comhttp://www.cisco.com/





[Think before you print.]Think before you print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.
For corporate legal information go to:
http://www.cisco.com/web/about/doing_business/legal/cri/index.html

Cisco Systems Canada Co, 181 Bay St., Suite 3400, Toronto, ON, Canada, M5J 2T3. 
Phone: 416-306-7000tel:416-306-7000; Fax: 416-306-7099tel:416-306-7099. 
Preferenceshttp://www.cisco.com/offer/subscribe/?sid=000478326 - 
Unsubscribehttp://www.cisco.com/offer/unsubscribe/?sid=000478327 – 
Privacyhttp://www.cisco.com/web/siteassets/legal/privacy.html