Author: arigo <[email protected]>
Branch: 
Changeset: r59446:92a2d4bc9ea7
Date: 2012-12-15 13:50 +0100
http://bitbucket.org/pypy/pypy/changeset/92a2d4bc9ea7/

Log:    Merged in bdkearns/pypy (pull request #96: fix fcntl.lockf, improve
        fcntl tests, fix test_ioctl failure)

diff --git a/pypy/module/fcntl/interp_fcntl.py 
b/pypy/module/fcntl/interp_fcntl.py
--- a/pypy/module/fcntl/interp_fcntl.py
+++ b/pypy/module/fcntl/interp_fcntl.py
@@ -74,21 +74,6 @@
     return wrap_oserror(space, OSError(errno, funcname),
                         exception_name = 'w_IOError')
 
-def _check_flock_op(space, op):
-
-    if op == LOCK_UN:
-        l_type = F_UNLCK
-    elif op & LOCK_SH:
-        l_type = F_RDLCK
-    elif op & LOCK_EX:
-        l_type = F_WRLCK
-    else:
-        raise OperationError(space.w_ValueError,
-            space.wrap("unrecognized flock argument"))
-    l = lltype.malloc(_flock.TO, flavor='raw')
-    l.c_l_type = rffi.cast(rffi.SHORT, l_type)
-    return l
-
 @unwrap_spec(op=int, w_arg=WrappedDefault(0))
 def fcntl(space, w_fd, op, w_arg):
     """fcntl(fd, op, [arg])
@@ -143,21 +128,14 @@
     manual flock(3) for details.  (On some systems, this function is
     emulated using fcntl().)"""
 
-    fd = space.c_filedescriptor_w(w_fd)
-
     if has_flock:
+        fd = space.c_filedescriptor_w(w_fd)
+        op = rffi.cast(rffi.INT, op)        # C long => C int
         rv = c_flock(fd, op)
         if rv < 0:
             raise _get_error(space, "flock")
     else:
-        l = _check_flock_op(space, op)
-        rffi.setintfield(l, 'c_l_whence', 0)
-        rffi.setintfield(l, 'c_l_start', 0)
-        rffi.setintfield(l, 'c_l_len', 0)
-        op = [F_SETLKW, F_SETLK][int(bool(op & LOCK_NB))]
-        op = rffi.cast(rffi.INT, op)        # C long => C int
-        fcntl_flock(fd, op, l)
-        lltype.free(l, flavor='raw')
+        lockf(space, w_fd, op)
 
 @unwrap_spec(op=int, length=int, start=int, whence=int)
 def lockf(space, w_fd, op, length=0, start=0, whence=0):
@@ -187,22 +165,28 @@
 
     fd = space.c_filedescriptor_w(w_fd)
 
-    l = _check_flock_op(space, op)
-    if start:
+    if op == LOCK_UN:
+        l_type = F_UNLCK
+    elif op & LOCK_SH:
+        l_type = F_RDLCK
+    elif op & LOCK_EX:
+        l_type = F_WRLCK
+    else:
+        raise OperationError(space.w_ValueError,
+            space.wrap("unrecognized lock operation"))
+
+    op = [F_SETLKW, F_SETLK][int(bool(op & LOCK_NB))]
+    op = rffi.cast(rffi.INT, op)        # C long => C int
+
+    l = lltype.malloc(_flock.TO, flavor='raw')
+    try:
+        rffi.setintfield(l, 'c_l_type', l_type)
         rffi.setintfield(l, 'c_l_start', int(start))
-    else:
-        rffi.setintfield(l, 'c_l_start', 0)
-    if len:
         rffi.setintfield(l, 'c_l_len', int(length))
-    else:
-        rffi.setintfield(l, 'c_l_len', 0)
-
-    l.c_l_whence = rffi.cast(rffi.SHORT, whence)
-
-    try:
-        op = [F_SETLKW, F_SETLK][int(bool(op & LOCK_NB))]
-        op = rffi.cast(rffi.INT, op)        # C long => C int
-        fcntl_flock(fd, op, l)
+        rffi.setintfield(l, 'c_l_whence', int(whence))
+        rv = fcntl_flock(fd, op, l)
+        if rv < 0:
+            raise _get_error(space, "fcntl")
     finally:
         lltype.free(l, flavor='raw')
 
diff --git a/pypy/module/fcntl/test/test_fcntl.py 
b/pypy/module/fcntl/test/test_fcntl.py
--- a/pypy/module/fcntl/test/test_fcntl.py
+++ b/pypy/module/fcntl/test/test_fcntl.py
@@ -100,22 +100,43 @@
 
     def test_flock(self):
         import fcntl
-        import sys
+        import os
+        import errno
 
         f = open(self.tmp + "c", "w+")
 
         raises(TypeError, fcntl.flock, "foo")
         raises(TypeError, fcntl.flock, f, "foo")
-        fcntl.flock(f, fcntl.LOCK_SH)
-        # this is an error EWOULDBLOCK, man: The file is locked and the
-        # LOCK_NB flag was selected.
-        raises(IOError, fcntl.flock, f, fcntl.LOCK_NB)
+
+        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+
+        pid = os.fork()
+        if pid == 0:
+            rval = 2
+            try:
+                fcntl.flock(open(f.name, f.mode), fcntl.LOCK_EX | 
fcntl.LOCK_NB)
+            except IOError, e:
+                if e.errno not in (errno.EACCES, errno.EAGAIN):
+                    raise
+                rval = 0
+            else:
+                rval = 1
+            finally:
+                os._exit(rval)
+
+        assert pid > 0
+        (pid, status) = os.waitpid(pid, 0)
+        assert os.WIFEXITED(status) == True
+        assert os.WEXITSTATUS(status) == 0
+
         fcntl.flock(f, fcntl.LOCK_UN)
 
         f.close()
 
     def test_lockf(self):
         import fcntl
+        import os
+        import errno
 
         f = open(self.tmp + "d", "w+")
 
@@ -124,7 +145,27 @@
         raises(ValueError, fcntl.lockf, f, -256)
         raises(ValueError, fcntl.lockf, f, 256)
 
-        fcntl.lockf(f, fcntl.LOCK_SH)
+        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+
+        pid = os.fork()
+        if pid == 0:
+            rval = 2
+            try:
+                fcntl.lockf(open(f.name, f.mode), fcntl.LOCK_EX | 
fcntl.LOCK_NB)
+            except IOError, e:
+                if e.errno not in (errno.EACCES, errno.EAGAIN):
+                    raise
+                rval = 0
+            else:
+                rval = 1
+            finally:
+                os._exit(rval)
+
+        assert pid > 0
+        (pid, status) = os.waitpid(pid, 0)
+        assert os.WIFEXITED(status) == True
+        assert os.WEXITSTATUS(status) == 0
+
         fcntl.lockf(f, fcntl.LOCK_UN)
 
         f.close()
@@ -132,11 +173,10 @@
     def test_ioctl(self):
         import fcntl
         import array
-        import sys, os
+        import os
 
         try:
             from termios import TIOCGPGRP
-            import pty
         except ImportError:
             skip("don't know how to test ioctl() on this platform")
 
@@ -145,10 +185,10 @@
         #raises(TypeError, fcntl.ioctl, 0, TIOCGPGRP, float(0))
         raises(TypeError, fcntl.ioctl, 0, TIOCGPGRP, 1, "foo")
 
-        child_pid, mfd = pty.fork()
-        if child_pid == 0:
-            # We're the child
-            return
+        try:
+            mfd = open("/dev/tty", 'r')
+        except IOError:
+            skip("couldn't open /dev/tty")
         try:
             buf = array.array('i', [0])
             res = fcntl.ioctl(mfd, TIOCGPGRP, buf, True)
@@ -169,7 +209,7 @@
             res = fcntl.ioctl(mfd, TIOCGPGRP, "\x00\x00\x00\x00")
             assert res == expected
         finally:
-            os.close(mfd)
+            mfd.close()
 
     def test_ioctl_int(self):
         import os
@@ -177,21 +217,17 @@
 
         try:
             from termios import TCFLSH, TCIOFLUSH
-            import pty
         except ImportError:
             skip("don't know how to test ioctl() on this platform")
 
-        mfd, sfd = pty.openpty()
+        try:
+            mfd = open("/dev/tty", 'r')
+        except IOError:
+            skip("couldn't open /dev/tty")
         try:
             assert fcntl.ioctl(mfd, TCFLSH, TCIOFLUSH) == 0
         finally:
-            os.close(mfd)
-            os.close(sfd)
-
-    def test_lockf_with_ex(self):
-        import fcntl
-        f = open(self.tmp, "w")
-        fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+            mfd.close()
 
     def test_large_flag(self):
         import sys
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to