Author: Amaury Forgeot d'Arc <[email protected]>
Branch:
Changeset: r75283:0c1f65b8adad
Date: 2015-01-10 13:32 +0100
http://bitbucket.org/pypy/pypy/changeset/0c1f65b8adad/
Log: Move implementation of termios functions from ll_termios to
rtermios.py
diff --git a/pypy/module/termios/__init__.py b/pypy/module/termios/__init__.py
--- a/pypy/module/termios/__init__.py
+++ b/pypy/module/termios/__init__.py
@@ -1,5 +1,7 @@
from pypy.interpreter.mixedmodule import MixedModule
+from rpython.rlib import rtermios
+
class Module(MixedModule):
"This module provides an interface to the Posix calls for tty I/O
control.\n\
For a complete description of these calls, see the Posix or Unix manual\n\
@@ -23,10 +25,6 @@
'error' : 'space.fromcache(interp_termios.Cache).w_error',
}
-# XXX this is extremaly not-portable, but how to prevent this?
-
-import termios
-for i in dir(termios):
- val = getattr(termios, i)
- if i.isupper() and type(val) is int:
- Module.interpleveldefs[i] = "space.wrap(%s)" % val
+ for name in rtermios.all_constants:
+ value = getattr(rtermios, name)
+ interpleveldefs[name] = "space.wrap(%s)" % value
diff --git a/pypy/module/termios/interp_termios.py
b/pypy/module/termios/interp_termios.py
--- a/pypy/module/termios/interp_termios.py
+++ b/pypy/module/termios/interp_termios.py
@@ -6,7 +6,6 @@
from pypy.interpreter.gateway import unwrap_spec
from pypy.interpreter.error import wrap_oserror, OperationError
from rpython.rlib import rtermios
-import termios
class Cache:
def __init__(self, space):
@@ -52,9 +51,9 @@
l_w = [space.wrap(i) for i in [iflag, oflag, cflag, lflag, ispeed, ospeed]]
# last one need to be chosen carefully
cc_w = [space.wrap(i) for i in cc]
- if lflag & termios.ICANON:
- cc_w[termios.VMIN] = space.wrap(ord(cc[termios.VMIN][0]))
- cc_w[termios.VTIME] = space.wrap(ord(cc[termios.VTIME][0]))
+ if lflag & rtermios.ICANON:
+ cc_w[rtermios.VMIN] = space.wrap(ord(cc[rtermios.VMIN][0]))
+ cc_w[rtermios.VTIME] = space.wrap(ord(cc[rtermios.VTIME][0]))
w_cc = space.newlist(cc_w)
l_w.append(w_cc)
return space.newlist(l_w)
@@ -63,14 +62,14 @@
def tcsendbreak(space, w_fd, duration):
fd = space.c_filedescriptor_w(w_fd)
try:
- termios.tcsendbreak(fd, duration)
+ rtermios.tcsendbreak(fd, duration)
except OSError, e:
raise convert_error(space, e)
def tcdrain(space, w_fd):
fd = space.c_filedescriptor_w(w_fd)
try:
- termios.tcdrain(fd)
+ rtermios.tcdrain(fd)
except OSError, e:
raise convert_error(space, e)
@@ -78,7 +77,7 @@
def tcflush(space, w_fd, queue):
fd = space.c_filedescriptor_w(w_fd)
try:
- termios.tcflush(fd, queue)
+ rtermios.tcflush(fd, queue)
except OSError, e:
raise convert_error(space, e)
@@ -86,6 +85,6 @@
def tcflow(space, w_fd, action):
fd = space.c_filedescriptor_w(w_fd)
try:
- termios.tcflow(fd, action)
+ rtermios.tcflow(fd, action)
except OSError, e:
raise convert_error(space, e)
diff --git a/pypy/module/termios/test/test_termios.py
b/pypy/module/termios/test/test_termios.py
--- a/pypy/module/termios/test/test_termios.py
+++ b/pypy/module/termios/test/test_termios.py
@@ -136,7 +136,7 @@
val = getattr(termios, name)
if name.isupper() and type(val) is int:
d[name] = val
- assert d == self.orig_module_dict
+ assert sorted(d.items()) == sorted(self.orig_module_dict.items())
def test_error(self):
import termios, errno, os
diff --git a/rpython/annotator/classdef.py b/rpython/annotator/classdef.py
--- a/rpython/annotator/classdef.py
+++ b/rpython/annotator/classdef.py
@@ -451,11 +451,3 @@
pass
else:
FORCE_ATTRIBUTES_INTO_CLASSES[WindowsError] = {'winerror': SomeInteger()}
-
-try:
- import termios
-except ImportError:
- pass
-else:
- FORCE_ATTRIBUTES_INTO_CLASSES[termios.error] = \
- {'args': SomeTuple([SomeInteger(), SomeString()])}
diff --git a/rpython/rlib/rtermios.py b/rpython/rlib/rtermios.py
--- a/rpython/rlib/rtermios.py
+++ b/rpython/rlib/rtermios.py
@@ -3,36 +3,178 @@
# returns list of mostly-strings of length one, but with few ints
# inside, so we make sure it works
-import termios
-from termios import *
+from rpython.rtyper.lltypesystem import rffi, lltype
+from rpython.rtyper.tool import rffi_platform
+from rpython.translator.tool.cbuild import ExternalCompilationInfo
+
+from rpython.rlib import rposix
+from rpython.rlib.rarithmetic import intmask
+
+eci = ExternalCompilationInfo(
+ includes = ['termios.h', 'unistd.h', 'sys/ioctl.h']
+)
+
+class CConfig:
+ _compilation_info_ = eci
+ _HAVE_STRUCT_TERMIOS_C_ISPEED = rffi_platform.Defined(
+ '_HAVE_STRUCT_TERMIOS_C_ISPEED')
+ _HAVE_STRUCT_TERMIOS_C_OSPEED = rffi_platform.Defined(
+ '_HAVE_STRUCT_TERMIOS_C_OSPEED')
+
+CONSTANT_NAMES = (
+ # cfgetospeed(), cfsetospeed() constants
+ """B0 B50 B75 B110 B134 B150 B200 B300 B600 B1200 B1800 B2400 B4800 B9600
+ B19200 B38400 B57600 B115200 B230400 B460800 CBAUDEX
+ """
+ # tcsetattr() constants
+ """TCSANOW TCSADRAIN TCSAFLUSH TCSASOFT
+ """
+ # tcflush() constants
+ """TCIFLUSH TCOFLUSH TCIOFLUSH
+ """
+ # tcflow() constants
+ """TCOOFF TCOON TCIOFF TCION
+ """
+ # struct termios.c_iflag constants
+ """IGNBRK BRKINT IGNPAR PARMRK INPCK ISTRIP INLCR IGNCR ICRNL IUCLC
+ IXON IXANY IXOFF IMAXBEL
+ """
+ # struct termios.c_oflag constants
+ """OPOST OLCUC ONLCR OCRNL ONOCR ONLRET OFILL OFDEL
+ NLDLY CRDLY TABDLY BSDLY VTDLY FFDLY
+ """
+ # struct termios.c_oflag-related values (delay mask)
+ """NL0 NL1 CR0 CR1 CR2 CR3 TAB0 TAB1 TAB2 TAB3 XTABS
+ BS0 BS1 VT0 VT1 FF0 FF1
+ """
+ # struct termios.c_cflag constants
+ """CSIZE CSTOPB CREAD PARENB PARODD HUPCL CLOCAL CIBAUD CRTSCTS
+ """
+ # struct termios.c_cflag-related values (character size)
+ """CS5 CS6 CS7 CS8
+ """
+ # struct termios.c_lflag constants
+ """ISIG ICANON XCASE ECHO ECHOE ECHOK ECHONL ECHOCTL ECHOPRT ECHOKE
+ FLUSHO NOFLSH TOSTOP PENDIN IEXTEN
+ """
+ # indexes into the control chars array returned by tcgetattr()
+ """VINTR VQUIT VERASE VKILL VEOF VTIME VMIN VSWTC VSWTCH VSTART VSTOP
+ VSUSP VEOL VREPRINT VDISCARD VWERASE VLNEXT VEOL2
+ """
+ # Others?
+ """CBAUD CDEL CDSUSP CEOF CEOL CEOL2 CEOT CERASE CESC CFLUSH CINTR CKILL
+ CLNEXT CNUL COMMON CQUIT CRPRNT CSTART CSTOP CSUSP CSWTCH CWERASE
+ EXTA EXTB
+ FIOASYNC FIOCLEX FIONBIO FIONCLEX FIONREAD
+ IBSHIFT INIT_C_CC IOCSIZE_MASK IOCSIZE_SHIFT
+ NCC NCCS NSWTCH N_MOUSE N_PPP N_SLIP N_STRIP N_TTY
+ TCFLSH TCGETA TCGETS TCSBRK TCSBRKP TCSETA TCSETAF TCSETAW TCSETS
+ TCSETSF TCSETSW TCXONC
+ TIOCCONS TIOCEXCL TIOCGETD TIOCGICOUNT TIOCGLCKTRMIOS TIOCGPGRP
+ TIOCGSERIAL TIOCGSOFTCAR TIOCGWINSZ TIOCINQ TIOCLINUX TIOCMBIC
+ TIOCMBIS TIOCMGET TIOCMIWAIT TIOCMSET TIOCM_CAR TIOCM_CD TIOCM_CTS
+ TIOCM_DSR TIOCM_DTR TIOCM_LE TIOCM_RI TIOCM_RNG TIOCM_RTS TIOCM_SR
+ TIOCM_ST TIOCNOTTY TIOCNXCL TIOCOUTQ TIOCPKT TIOCPKT_DATA
+ TIOCPKT_DOSTOP TIOCPKT_FLUSHREAD TIOCPKT_FLUSHWRITE TIOCPKT_NOSTOP
+ TIOCPKT_START TIOCPKT_STOP TIOCSCTTY TIOCSERCONFIG TIOCSERGETLSR
+ TIOCSERGETMULTI TIOCSERGSTRUCT TIOCSERGWILD TIOCSERSETMULTI
+ TIOCSERSWILD TIOCSER_TEMT TIOCSETD TIOCSLCKTRMIOS TIOCSPGRP
+ TIOCSSERIAL TIOCSSOFTCAR TIOCSTI TIOCSWINSZ TIOCTTYGSTRUCT
+ """).split()
+
+for name in CONSTANT_NAMES:
+ setattr(CConfig, name, rffi_platform.DefinedConstantInteger(name))
+
+c_config = rffi_platform.configure(CConfig)
+
+# Copy VSWTCH to VSWTC and vice-versa
+if c_config['VSWTC'] is None:
+ c_config['VSWTC'] = c_config['VSWTCH']
+if c_config['VSWTCH'] is None:
+ c_config['VSWTCH'] = c_config['VSWTC']
+
+all_constants = {}
+for name in CONSTANT_NAMES:
+ value = c_config[name]
+ if value is not None:
+ globals()[name] = value
+ all_constants[name] = value
+
+TCFLAG_T = rffi.UINT
+CC_T = rffi.UCHAR
+SPEED_T = rffi.UINT
+
+_add = []
+if c_config['_HAVE_STRUCT_TERMIOS_C_ISPEED']:
+ _add.append(('c_ispeed', SPEED_T))
+if c_config['_HAVE_STRUCT_TERMIOS_C_OSPEED']:
+ _add.append(('c_ospeed', SPEED_T))
+TERMIOSP = rffi.CStructPtr('termios', ('c_iflag', TCFLAG_T), ('c_oflag',
TCFLAG_T),
+ ('c_cflag', TCFLAG_T), ('c_lflag', TCFLAG_T),
+ ('c_line', CC_T),
+ ('c_cc', lltype.FixedSizeArray(CC_T, NCCS)), *_add)
+
+def c_external(name, args, result):
+ return rffi.llexternal(name, args, result, compilation_info=eci)
+
+c_tcgetattr = c_external('tcgetattr', [rffi.INT, TERMIOSP], rffi.INT)
+c_tcsetattr = c_external('tcsetattr', [rffi.INT, rffi.INT, TERMIOSP], rffi.INT)
+c_cfgetispeed = c_external('cfgetispeed', [TERMIOSP], SPEED_T)
+c_cfgetospeed = c_external('cfgetospeed', [TERMIOSP], SPEED_T)
+c_cfsetispeed = c_external('cfsetispeed', [TERMIOSP, SPEED_T], rffi.INT)
+c_cfsetospeed = c_external('cfsetospeed', [TERMIOSP, SPEED_T], rffi.INT)
+
+c_tcsendbreak = c_external('tcsendbreak', [rffi.INT, rffi.INT], rffi.INT)
+c_tcdrain = c_external('tcdrain', [rffi.INT], rffi.INT)
+c_tcflush = c_external('tcflush', [rffi.INT, rffi.INT], rffi.INT)
+c_tcflow = c_external('tcflow', [rffi.INT, rffi.INT], rffi.INT)
+
def tcgetattr(fd):
- # NOT_RPYTHON
- try:
- lst = list(termios.tcgetattr(fd))
- except termios.error, e:
- raise OSError(*e.args)
- cc = lst[-1]
- next_cc = []
- for c in cc:
- if isinstance(c, int):
- next_cc.append(chr(c))
- else:
- next_cc.append(c)
- lst[-1] = next_cc
- return tuple(lst)
+ with lltype.scoped_alloc(TERMIOSP.TO) as c_struct:
+ if c_tcgetattr(fd, c_struct) < 0:
+ raise OSError(rposix.get_errno(), 'tcgetattr failed')
+ cc = [chr(c_struct.c_c_cc[i]) for i in range(NCCS)]
+ ispeed = c_cfgetispeed(c_struct)
+ ospeed = c_cfgetospeed(c_struct)
+ result = (intmask(c_struct.c_c_iflag), intmask(c_struct.c_c_oflag),
+ intmask(c_struct.c_c_cflag), intmask(c_struct.c_c_lflag),
+ intmask(ispeed), intmask(ospeed), cc)
+ return result
-def tcsetattr(fd, when, mode):
- # NOT_RPYTHON
- # there are some bizarre requirements for that, stealing directly
- # from cpython
- mode_l = list(mode)
- if mode_l[3] & termios.ICANON:
- cc = mode_l[-1]
- cc[termios.VMIN] = ord(cc[termios.VMIN])
- cc[termios.VTIME] = ord(cc[termios.VTIME])
- mode_l[-1] = cc
- try:
- return termios.tcsetattr(fd, when, mode_l)
- except termios.error, e:
- raise OSError(*e.args)
+
+# This function is not an exact replacement of termios.tcsetattr:
+# the last attribute must be a list of chars.
+def tcsetattr(fd, when, attributes):
+ with lltype.scoped_alloc(TERMIOSP.TO) as c_struct:
+ rffi.setintfield(c_struct, 'c_c_iflag', attributes[0])
+ rffi.setintfield(c_struct, 'c_c_oflag', attributes[1])
+ rffi.setintfield(c_struct, 'c_c_cflag', attributes[2])
+ rffi.setintfield(c_struct, 'c_c_lflag', attributes[3])
+ ispeed = attributes[4]
+ ospeed = attributes[5]
+ cc = attributes[6]
+ for i in range(NCCS):
+ c_struct.c_c_cc[i] = rffi.r_uchar(ord(cc[i][0]))
+ if c_cfsetispeed(c_struct, ispeed) < 0:
+ raise OSError(rposix.get_errno(), 'tcsetattr failed')
+ if c_cfsetospeed(c_struct, ospeed) < 0:
+ raise OSError(rposix.get_errno(), 'tcsetattr failed')
+ if c_tcsetattr(fd, when, c_struct) < 0:
+ raise OSError(rposix.get_errno(), 'tcsetattr failed')
+
+def tcsendbreak(fd, duration):
+ if c_tcsendbreak(fd, duration) < 0:
+ raise OSError(rposix.get_errno(), 'tcsendbreak failed')
+
+def tcdrain(fd):
+ if c_tcdrain(fd) < 0:
+ raise OSError(rposix.get_errno(), 'tcdrain failed')
+
+def tcflush(fd, queue_selector):
+ if c_tcflush(fd, queue_selector) < 0:
+ raise OSError(rposix.get_errno(), 'tcflush failed')
+
+def tcflow(fd, action):
+ if c_tcflow(fd, action) < 0:
+ raise OSError(rposix.get_errno(), 'tcflow failed')
diff --git a/rpython/rtyper/module/test/test_ll_termios.py
b/rpython/rlib/test/test_rtermios.py
rename from rpython/rtyper/module/test/test_ll_termios.py
rename to rpython/rlib/test/test_rtermios.py
--- a/rpython/rtyper/module/test/test_ll_termios.py
+++ b/rpython/rlib/test/test_rtermios.py
@@ -77,13 +77,12 @@
def test_tcrest(self):
from rpython.translator.c.test.test_genc import compile
- from rpython.rtyper.module import ll_termios
- import termios, time
+ from rpython.rlib import rtermios
def runs_tcall():
- termios.tcsendbreak(2, 0)
- termios.tcdrain(2)
- termios.tcflush(2, termios.TCIOFLUSH)
- termios.tcflow(2, termios.TCOON)
+ rtermios.tcsendbreak(2, 0)
+ rtermios.tcdrain(2)
+ rtermios.tcflush(2, rtermios.TCIOFLUSH)
+ rtermios.tcflow(2, rtermios.TCOON)
print "ok"
fn = compile(runs_tcall, [], backendopt=False)
diff --git a/rpython/rtyper/extfuncregistry.py
b/rpython/rtyper/extfuncregistry.py
--- a/rpython/rtyper/extfuncregistry.py
+++ b/rpython/rtyper/extfuncregistry.py
@@ -10,12 +10,6 @@
from rpython.rtyper.module import ll_os
from rpython.rtyper.module import ll_time
from rpython.rlib import rfloat
-try:
- import termios
-except ImportError:
- pass
-else:
- from rpython.rtyper.module import ll_termios
# the following functions all take one float, return one float
# and are part of math.h
diff --git a/rpython/rtyper/module/ll_termios.py
b/rpython/rtyper/module/ll_termios.py
deleted file mode 100644
--- a/rpython/rtyper/module/ll_termios.py
+++ /dev/null
@@ -1,135 +0,0 @@
-
-"""
-The low-level implementation of termios module
-note that this module should only be imported when
-termios module is there
-"""
-
-import termios
-from rpython.rtyper.lltypesystem import rffi
-from rpython.rtyper.lltypesystem import lltype
-from rpython.rtyper.extfunc import lazy_register, register_external
-from rpython.rlib.rarithmetic import intmask
-from rpython.rtyper.extregistry import ExtRegistryEntry
-from rpython.annotator import model as annmodel
-from rpython.rtyper import rclass
-from rpython.rlib import rtermios, rposix
-from rpython.rtyper.tool import rffi_platform
-from rpython.translator.tool.cbuild import ExternalCompilationInfo
-
-eci = ExternalCompilationInfo(
- includes = ['termios.h', 'unistd.h']
-)
-
-class CConfig:
- _compilation_info_ = eci
- NCCS = rffi_platform.DefinedConstantInteger('NCCS')
- _HAVE_STRUCT_TERMIOS_C_ISPEED = rffi_platform.Defined(
- '_HAVE_STRUCT_TERMIOS_C_ISPEED')
- _HAVE_STRUCT_TERMIOS_C_OSPEED = rffi_platform.Defined(
- '_HAVE_STRUCT_TERMIOS_C_OSPEED')
-
-c_config = rffi_platform.configure(CConfig)
-NCCS = c_config['NCCS']
-
-TCFLAG_T = rffi.UINT
-CC_T = rffi.UCHAR
-SPEED_T = rffi.UINT
-INT = rffi.INT
-
-_add = []
-if c_config['_HAVE_STRUCT_TERMIOS_C_ISPEED']:
- _add.append(('c_ispeed', SPEED_T))
-if c_config['_HAVE_STRUCT_TERMIOS_C_OSPEED']:
- _add.append(('c_ospeed', SPEED_T))
-TERMIOSP = rffi.CStructPtr('termios', ('c_iflag', TCFLAG_T), ('c_oflag',
TCFLAG_T),
- ('c_cflag', TCFLAG_T), ('c_lflag', TCFLAG_T),
- ('c_line', CC_T),
- ('c_cc', lltype.FixedSizeArray(CC_T, NCCS)), *_add)
-
-def c_external(name, args, result):
- return rffi.llexternal(name, args, result, compilation_info=eci)
-
-c_tcsetattr = c_external('tcsetattr', [INT, INT, TERMIOSP], INT)
-c_cfgetispeed = c_external('cfgetispeed', [TERMIOSP], SPEED_T)
-c_cfgetospeed = c_external('cfgetospeed', [TERMIOSP], SPEED_T)
-c_cfsetispeed = c_external('cfsetispeed', [TERMIOSP, SPEED_T], INT)
-c_cfsetospeed = c_external('cfsetospeed', [TERMIOSP, SPEED_T], INT)
-c_tcsendbreak = c_external('tcsendbreak', [INT, INT], INT)
-c_tcdrain = c_external('tcdrain', [INT], INT)
-c_tcflush = c_external('tcflush', [INT, INT], INT)
-c_tcflow = c_external('tcflow', [INT, INT], INT)
-
-c_tcgetattr = c_external('tcgetattr', [INT, TERMIOSP], INT)
-
-def tcgetattr_llimpl(fd):
- c_struct = lltype.malloc(TERMIOSP.TO, flavor='raw')
-
- try:
- if c_tcgetattr(fd, c_struct) < 0:
- raise OSError(rposix.get_errno(), 'tcgetattr failed')
- cc = [chr(c_struct.c_c_cc[i]) for i in range(NCCS)]
- ispeed = c_cfgetispeed(c_struct)
- ospeed = c_cfgetospeed(c_struct)
- result = (intmask(c_struct.c_c_iflag), intmask(c_struct.c_c_oflag),
- intmask(c_struct.c_c_cflag), intmask(c_struct.c_c_lflag),
- intmask(ispeed), intmask(ospeed), cc)
- return result
- finally:
- lltype.free(c_struct, flavor='raw')
-
-register_external(rtermios.tcgetattr, [int], (int, int, int, int, int, int,
[str]),
- llimpl=tcgetattr_llimpl, export_name='termios.tcgetattr')
-
-def tcsetattr_llimpl(fd, when, attributes):
- c_struct = lltype.malloc(TERMIOSP.TO, flavor='raw')
- try:
- c_struct.c_c_iflag = r_uint(attributes[0])
- c_struct.c_c_oflag = r_uint(attributes[1])
- c_struct.c_c_cflag = r_uint(attributes[2])
- c_struct.c_c_lflag = r_uint(attributes[3])
- ispeed = r_uint(attributes[4])
- ospeed = r_uint(attributes[5])
- cc = attributes[6]
- for i in range(NCCS):
- c_struct.c_c_cc[i] = rffi.r_uchar(ord(cc[i][0]))
- if c_cfsetispeed(c_struct, ispeed) < 0:
- raise OSError(rposix.get_errno(), 'tcsetattr failed')
- if c_cfsetospeed(c_struct, ospeed) < 0:
- raise OSError(rposix.get_errno(), 'tcsetattr failed')
- if c_tcsetattr(fd, when, c_struct) < 0:
- raise OSError(rposix.get_errno(), 'tcsetattr failed')
- finally:
- lltype.free(c_struct, flavor='raw')
-
-r_uint = rffi.r_uint
-register_external(rtermios.tcsetattr, [int, int, (int, int, int,
- int, int, int, [str])], llimpl=tcsetattr_llimpl,
- export_name='termios.tcsetattr')
-
-# a bit C-c C-v code follows...
-
-def tcsendbreak_llimpl(fd, duration):
- if c_tcsendbreak(fd, duration):
- raise OSError(rposix.get_errno(), 'tcsendbreak failed')
-register_external(termios.tcsendbreak, [int, int],
- llimpl=tcsendbreak_llimpl,
- export_name='termios.tcsendbreak')
-
-def tcdrain_llimpl(fd):
- if c_tcdrain(fd) < 0:
- raise OSError(rposix.get_errno(), 'tcdrain failed')
-register_external(termios.tcdrain, [int], llimpl=tcdrain_llimpl,
- export_name='termios.tcdrain')
-
-def tcflush_llimpl(fd, queue_selector):
- if c_tcflush(fd, queue_selector) < 0:
- raise OSError(rposix.get_errno(), 'tcflush failed')
-register_external(termios.tcflush, [int, int], llimpl=tcflush_llimpl,
- export_name='termios.tcflush')
-
-def tcflow_llimpl(fd, action):
- if c_tcflow(fd, action) < 0:
- raise OSError(rposix.get_errno(), 'tcflow failed')
-register_external(termios.tcflow, [int, int], llimpl=tcflow_llimpl,
- export_name='termios.tcflow')
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit