#!/usr/bin/python
"""Demonstration program for termios VMIN, VTIME
Usage:
titest [-n] [-s] [-t] [-0] [-r #] [-w #]
-n: Set non-blocking mode in responder Default: off
-s: Use select before read in responder Default: off
-t: Give a reply that is one byte shorter than requested Default: off
-l: Give a reply that is one byte longer than requested Default: off
-0: Give a zero-byte reply Default: off
-r #: Set VTIME to (r+99)/100 (i.e. # is VTIME in milliseconds) Default: 300
-w #: Set writer inter-character time to # milliseconds Default: 200
-z #: Set request size in bytes Default: 4
Theory:
The program allocates a pseudoterminal and then starts a 'responder' thread.
The responder thread reads one byte; the value of that byte is the length
of the response (possibly affected by the -t, -l or -0 flags).
The responder writes one character at a time, with a delay after each
character (-w). When it receives a '\0' request, it exits.
The main thread invokes the 'requester'. The requester sets non-canonical mode,
VMIN according to the request size (-z), and VTIME according to the -r
argument. Then it optionally sets non-blocking mode (-n), optionally selects
(-s), and then reads. After the read (or failing select) completes, it prints
the result.
(Here, 'master' and 'slave' refer to the two ends of the pseudoterminal.
'slave' behaves like a tty device but actually communicates with the file
descriptor 'master'. This is not related to the modbus concept of master
and slave devices)
Results:
In non-blocking mode, at most one byte is ever read.
Without select, a zero-length reply causes the requester to block forever.
(may require killing the test program with a signal)
When the requester wait is not longer than the responder delay, a short
read (almost always 1 byte) results.
When the requester wait is longer than the responder delay, select is enabled,
and non-blocking mode is not reqested, then all three cases (no reply,
short reply, desired reply) work.
A "too long" response is not detected and extra bytes from an earlier
over-length request will be the first bytes presented at a subsequent
read.
(All tests were performed on Ubuntu 10.04 with stock kernel)
Relation to modbus master:
Termios VTIME and VMIN plus select are an adequate mechanism for
receiving modbus replies. A non-responsive slave, an error reply, and a
correct reply can all be received without looping on read or
read+select. Non-blocking mode is harmful and should not be used;
select ensures the program won't block forever.
"""
import fcntl
import getopt
import os
import pty
import select
import sys
import termios
import threading
import time
do_nonblock = do_select = do_short = do_noreply = do_short = do_long = False
request_count = 4
responder_delay = 200
requester_wait = 300
opts, args = getopt.getopt(sys.argv[1:], 'nstl0r:w:z:')
for k, v in opts:
if k == '-n': do_nonblock = not do_nonblock
if k == '-s': do_select = not do_select
if k == '-t': do_short = not do_short
if k == '-l': do_long = not do_long
if k == '-0': do_noreply = not do_noreply
if k == '-w': responder_delay = int(v)
if k == '-r': requester_wait = int(v)
if k == '-z': request_count = int(v)
master, slave = pty.openpty()
class ResponderThread(threading.Thread):
def __init__(self):
self.quit = False
threading.Thread.__init__(self)
def run(self):
while 1:
c = os.read(master, 1)
o = ord(c)
if self.quit or not o: return
if do_noreply: continue
if do_short: o -= 1
if do_long: o += 1
for i in range(o):
if self.quit: return
os.write(master, "A")
time.sleep(responder_delay / 1000.)
def requester():
attr = termios.tcgetattr(slave)
attr[6][termios.VMIN] = request_count
attr[6][termios.VTIME] = (requester_wait+99) / 100
attr[3] &= ~(termios.ICANON | termios.ECHO)
termios.tcsetattr(slave, termios.TCSANOW, attr)
if do_nonblock:
fcntl.fcntl(slave, fcntl.F_SETFL,
fcntl.fcntl(slave, fcntl.F_GETFL) | os.O_NONBLOCK)
os.write(slave, chr(request_count))
if do_select:
r,w,x = select.select([slave], [], [], requester_wait / 1000.)
if not r:
print "Wanted %d, select failed" % (request_count)
os.write(slave, chr(0))
return
r = len(os.read(slave, request_count))
print "Wanted %d, got %d" % (request_count, r)
os.write(slave, chr(0))
w = ResponderThread()
w.start()
try:
requester()
finally:
w.quit = True
w.join()
# Copyright (C) 2010 Jeff Epler
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
------------------------------------------------------------------------------
ThinkGeek and WIRED's GeekDad team up for the Ultimate
GeekDad Father's Day Giveaway. ONE MASSIVE PRIZE to the
lucky parental unit. See the prize list and enter to win:
http://p.sf.net/sfu/thinkgeek-promo
_______________________________________________
Emc-developers mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/emc-developers