Package: lfm
Version: 2.2-1
Severity: normal
Hi you could make a solution is to use the decoding/encoding of variable
taking care of utf-8 and ascii
Please check and improve the code below (please a fix noted)
you should try chars such as :
öäüpöäÄÖÜ
Eléanore
please leave me an email when you improved this utils.py file.
Thank you
# -*- coding: utf-8 -*-
"""utils.py
This module contains useful functions.
"""
import os
import os.path
import sys
import time
import signal
import select
import cPickle
import curses
import codecs
import compress
import messages
from __init__ import sysprogs, g_encoding
######################################################################
##### module variables
app = None
# first common codecs, to avoid slow down decoding
codecs_list = [g_encoding, 'latin-1', 'utf-8', 'cp437' ]
# if program is too slow due to too many encodings to try,
# comment out next lines or put the encodings you use first
import encodings.aliases
# Set is deprecated on python v2.6+, but set doesn't exist < v2.6
cds = {}
for c in encodings.aliases.aliases.values():
cds[c] = 1
codecs_list += sorted(cds.keys())
######################################################################
##### InterProcess Communication
class IPC(object):
def __init__(self):
pipe_r, pipe_w = os.pipe()
self.rfd = os.fdopen(pipe_r, 'rb', 0)
self.wfd = os.fdopen(pipe_w, 'wb', 0)
def send(self, buf):
cPickle.dump(buf, self.wfd)
# time.sleep(0.01)
cPickle.dump(None, self.wfd)
def receive(self):
ready = select.select([self.rfd], [], [], 0.001) # 0.01
if self.rfd in ready[0]:
try:
buf = cPickle.load(self.rfd)
except:
return -1, 'Error unmarshaling'
if buf is None:
return 0, None
try:
arg1, arg2 = buf
except:
return -1, 'Malformed response'
return 1, buf
return 0, None
def close(self):
self.rfd.close()
self.wfd.close()
######################################################################
##### Process Loop Base Class
class ProcessLoopBase(object):
"""Run a function in background, so it can be stopped, continued, etc.
There is also a graphical animation to show the program still runs and
has not crashed."""
anim_char = ('|', '/', '-', '\\')
def __init__(self, action='', func=None, *args):
self.action = action # action label
self.func = func # function to run in child
self.args = args # additional args for func
self.ret = [] # information to return
self.filename = '' # current filename
self.file_i = 0 # index to current item
self.cursor_i = 0 # index to cursor animation step
self.init_gui()
def init_gui(self):
self.cur_win = curses.newpad(1, 2)
self.cur_win.bkgd(curses.color_pair(1))
if self.processloop_type == 1:
dlg = messages.ProgressBarWindow
elif self.processloop_type == 2:
dlg = messages.ProgressBarWindow2
self.dlg = dlg(self.action, 'Press Ctrl-C to stop',
curses.color_pair(1), curses.color_pair(1),
curses.color_pair(20), curses.color_pair(4),
waitkey=False)
self.dlg.show()
def end_gui(self):
self.dlg.finish()
self.show_parent()
def show_parent(self):
self.dlg.ishidden = True
app.display()
def show_win(self):
raise NotImplementedError # in ProcessLoopBase_X class
def animate_cursor(self):
self.cur_win.erase()
self.cur_win.addch(ProcessLoopBase.anim_char[self.cursor_i%4],
curses.color_pair(1) | curses.A_BOLD)
self.cur_win.refresh(0, 0, 0, app.maxw-2, 1, app.maxw-1)
self.cursor_i += 1
if self.cursor_i > 3:
self.cursor_i = 0
def check_keys(self):
ch = self.dlg.getch()
if ch == 0x03:
os.kill(self.pid_child, signal.SIGSTOP)
self.show_parent()
ans = messages.confirm('Stop process',
'Stop \"%s\"' % self.action.lower(), 1)
if ans:
os.kill(self.pid_child, signal.SIGKILL)
os.wait()
return -100
else:
os.kill(self.pid_child, signal.SIGCONT)
return 1
return 0
def wait_for_answer(self):
while True:
# feedback from user
status = self.check_keys()
if status == -100: # stopped and ended by user
return ('stopped_by_user', None)
elif status == 1: # stopped and continued by user
self.show_win()
self.animate_cursor()
# check response
code, buf = self.c2p.receive()
if code == 1:
return buf
elif code == -1:
return ('internal_error', buf)
def ask_confirmation(self):
raise NotImplementedError # in final class
def prepare_args(self):
raise NotImplementedError # in final class
def process_response(self, result):
raise NotImplementedError # in final class
def exec_file(self, args):
# update progress dialog
self.show_win()
# send data to child
self.p2c.send(('exec', args))
# wait for answer and process it
ans, result = self.wait_for_answer()
if ans == 'stopped_by_user':
return -1
elif ans == 'internal_error':
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
'Parent: Internal Error: ' + result)
return 0
elif ans == 'error':
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
'Parent: Error: ' + result)
return 0
elif ans == 'result':
return self.process_response(result)
else:
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
'Parent: Bad response from child')
return 0
def return_data(self):
return self.ret
def run_pre(self):
self.p2c = IPC()
self.c2p = IPC()
self.pid_child = os.fork()
if self.pid_child < 0: # error
messages.error('Cannot %s\n' % self.action.lower() +
'Can\'t run function')
return -1
elif self.pid_child == 0: # child
self.child_process()
os._exit(0)
def run(self):
raise NotImplementedError # in ProcessLoopBase_X class
def run_post(self):
self.p2c.send(('quit', None))
self.p2c.close()
self.c2p.close()
try:
os.wait()
except OSError:
pass
self.end_gui()
def child_process(self):
while True:
# wait for command to execute
while True:
code, buf = self.p2c.receive()
if code == 1:
break
elif code == -1:
self.c2p.send(('error', 'Child: ' + buf))
continue
else:
continue
cmd, args = buf
# check command
if cmd == 'quit':
break
elif cmd == 'exec':
res = self.func(*args)
self.c2p.send(('result', res))
continue
else:
result = ('error', 'Child: Bad command from parent')
self.c2p.send(('result', result))
continue
# end
# time.sleep(.25) # time to let parent get return value
os._exit(0)
######################################################################
##### Process Loop Base Class, 1 progressbar
class ProcessLoopBase_1(ProcessLoopBase):
def __init__(self, action='', func=None, lst=[], *args):
self.processloop_type = 1
super(ProcessLoopBase_1, self).__init__(action, func, *args)
self.lst = lst
self.length = len(lst)
def show_win(self):
filename = self.filename
percent = 100 * self.file_i / self.length
idx_str = '%d/%d' % (self.file_i, self.length)
if self.dlg.ishidden:
self.dlg.show(filename, percent, idx_str)
else:
self.dlg.update(filename, percent, idx_str)
def run(self):
if ProcessLoopBase.run_pre(self) == -1:
return
for self.filename in self.lst:
ret = self.ask_confirmation()
if ret == -1:
break
elif ret == 0:
continue
self.file_i += 1
args = self.prepare_args()
ret = self.exec_file(args)
if ret == -1:
self.ret = -1 # stopped by user
break
ProcessLoopBase.run_post(self)
return self.return_data()
##### Process Loop DirSize
class ProcessLoopDirSize(ProcessLoopBase_1):
def ask_confirmation(self):
return 1
def prepare_args(self):
return (self.filename, ) + self.args
def process_response(self, result):
self.ret.append(result)
return 0
##### Process Loop Un/Compress
class ProcessLoopUnCompress(ProcessLoopBase_1):
def ask_confirmation(self):
return 1
def prepare_args(self):
return (self.filename, ) + self.args
def process_response(self, result):
if isinstance(result, tuple): # error
st, msg = result
if st == -1:
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() + msg)
return 0
##### Process Loop Rename
class ProcessLoopRename(ProcessLoopBase_1):
def ask_confirmation(self):
from actions import doEntry
buf = 'Rename \'%s\' to' % self.filename
tabpath = app.act_pane.act_tab.path
self.show_parent()
newname = doEntry(tabpath, 'Rename', buf, self.filename,
with_history='file')
if newname:
self.newname = newname
return 1
else:
return 0
def prepare_args(self):
return (self.filename, ) + self.args + (self.newname, )
def process_response(self, result):
if isinstance(result, unicode) or isinstance(result, str): # overwrite?
self.show_parent()
ans = messages.confirm(self.action,
'Overwrite \'%s\'' % result, 1)
if ans == -1:
return -1
elif ans == 0:
return 0
elif ans == 1:
args = (self.filename, ) + self.args + (self.newname, False)
self.dlg.show()
return self.exec_file(args)
elif isinstance(result, tuple): # error from child
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
self.filename + ': %s (%s)' % result)
return 0
else:
return 0
##### Process Loop Rename
class ProcessLoopBackup(ProcessLoopBase_1):
def ask_confirmation(self):
return 1
def prepare_args(self):
return (self.filename, ) + self.args
def process_response(self, result):
if isinstance(result, unicode) or isinstance(result, str): # overwrite?
self.show_parent()
ans = messages.confirm(self.action,
'Overwrite \'%s\'' % result, 1)
if ans == -1:
return -1
elif ans == 0:
return 0
elif ans == 1:
args = (self.filename, ) + self.args + (False, )
self.dlg.show()
return self.exec_file(args)
elif isinstance(result, tuple): # error from child
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
self.filename + ': %s (%s)' % result)
return 0
else:
return 0
######################################################################
##### Process Loop Base Class, 2 progressbar
class ProcessLoopBase_2(ProcessLoopBase):
def __init__(self, action='', func=None, pc=None, *args):
self.processloop_type = 2
super(ProcessLoopBase_2, self).__init__(action, func, *args)
self.pc = pc # PathContents
self.filesize_aggr = 0 # partial sum of processed files
def show_win(self):
filename = self.filename.replace(self.pc.basepath+os.sep, '')
perc_size = 100 * self.filesize_aggr / self.pc.tsize
perc_count = 100 * self.file_i / self.pc.tlength
idx_str = '%d/%d' % (self.file_i, self.pc.tlength)
if self.dlg.ishidden:
self.dlg.show(filename, perc_size, perc_count, idx_str)
else:
self.dlg.update(filename, perc_size, perc_count, idx_str)
def run(self):
for filename, err in self.pc.errors:
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
filename + ': %s (%s)' % err)
if ProcessLoopBase.run_pre(self) == -1:
return
for self.filename, filesize in self.pc.iter_walk(reverse=self.rev):
ret = self.ask_confirmation()
if ret == -1:
break
elif ret == 0:
continue
self.file_i += 1
self.filesize_aggr += filesize
args = self.prepare_args()
ret = self.exec_file(args)
if ret == -1:
self.ret = -1 # stopped by user
break
ProcessLoopBase.run_post(self)
return self.return_data()
##### Process Loop Copy
class ProcessLoopCopy(ProcessLoopBase_2):
def __init__(self, action='', func=None, pc=None, *args):
super(ProcessLoopCopy, self).__init__(action, func, pc, *args)
self.rev = False
self.overwrite_all = not app.prefs.confirmations['overwrite']
self.overwrite_none = False
def ask_confirmation(self):
return 1
def prepare_args(self):
if self.pc.basepath == os.sep:
filename = self.filename.replace(self.pc.basepath, '')
else:
filename = self.filename.replace(self.pc.basepath+os.sep, '')
if self.overwrite_all:
return (filename, self.pc.basepath) + self.args + (False, )
else:
return (filename, self.pc.basepath) + self.args
def process_response(self, result):
if isinstance(result, unicode) or isinstance(result, str): # overwrite?
if self.overwrite_none:
self.ret.append(self.filename)
return 0
self.show_parent()
ans = messages.confirm_all_none(self.action,
'Overwrite \'%s\'' % result, 1)
if ans == -1:
self.ret.append(self.filename)
return -1
elif ans == -2:
self.ret.append(self.filename)
self.overwrite_none = True
return 0
elif ans == 0:
self.ret.append(self.filename)
return 0
elif ans == 1:
pass
elif ans == 2:
self.overwrite_all = True
if self.pc.basepath == os.sep:
filename = self.filename.replace(self.pc.basepath, '')
else:
filename = self.filename.replace(self.pc.basepath+os.sep, '')
args = (filename, self.pc.basepath) + self.args + (False, )
return self.exec_file(args)
elif isinstance(result, tuple): # error from child
self.ret.append(self.filename)
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
self.filename + ': %s (%s)' % result)
return 0
else:
return 0
##### Process Loop Delete
class ProcessLoopDelete(ProcessLoopBase_2):
def __init__(self, action='', func=None, pc=None, *args):
super(ProcessLoopDelete, self).__init__(action, func, pc, *args)
self.rev = True
self.delete_all = not app.prefs.confirmations['delete']
def ask_confirmation(self):
if self.delete_all:
return 2
if app.prefs.confirmations['delete']:
self.show_parent()
ans = messages.confirm_all('Delete', 'Delete \'%s\'' %
self.filename, 1)
if ans == 2:
self.delete_all = True
return ans
def prepare_args(self):
return (self.filename, ) + self.args
def process_response(self, result):
if isinstance(result, tuple): # error from child
self.dlg.ishidden = True
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
self.filename + ': %s (%s)' % result)
return 0
######################################################################
##### Process Func
class ProcessFunc(object):
"""Run a function in background, so it can be stopped, continued, etc.
There is also a graphical animation to show the program still runs and
has not crashed.
Parameters:
title: title of info window
subtitle: subtitle of info window
func: function to run
*args: arguments to pass to the function
Returns:
(status, message)"""
anim_char = ('|', '/', '-', '\\')
def __init__(self, title='', subtitle='', func=None, *args):
self.func = func
self.args = args
self.title = title[:app.maxw-14]
self.subtitle = subtitle[:app.maxw-14]
self.init_gui()
self.cursor_i = 0
self.status = 0
self.output = []
self.ret = None
def init_gui(self):
self.cur_win = curses.newpad(1, 2)
self.cur_win.bkgd(curses.color_pair(1))
app.statusbar.win.nodelay(1)
self.show_parent()
self.show_win()
def end_gui(self):
app.statusbar.win.nodelay(0)
self.show_parent()
def show_parent(self):
app.display()
def show_win(self):
messages.win_nokey(self.title, self.subtitle, 'Press Ctrl-C to stop')
def animate_cursor(self):
self.cur_win.erase()
self.cur_win.addch(ProcessFunc.anim_char[self.cursor_i%4],
curses.color_pair(1) | curses.A_BOLD)
self.cur_win.refresh(0, 0, 0, app.maxw-2, 1, app.maxw-1)
self.cursor_i += 1
if self.cursor_i > 3:
self.cursor_i = 0
def check_finish(self):
(pid, status) = os.waitpid(self.pid_child, os.WNOHANG)
if pid > 0:
self.status = status >> 8
return True
else:
return False
def process_result(self):
code, buf = self.c2p.receive()
if code == 1:
self.ret = buf
elif code == -1:
self.show_parent()
messages.error('Cannot %s\n' % self.action.lower() +
'Parent: ' + buf)
self.show_parent()
self.show_win()
else:
pass
def check_keys(self):
ch = app.statusbar.win.getch()
if ch == 0x03:
os.kill(self.pid_child, signal.SIGSTOP)
self.show_parent()
ans = messages.confirm('Stop process',
'%s %s' % (self.title, self.subtitle),
1)
if ans:
os.kill(self.pid_child, signal.SIGKILL)
os.wait()
return -100
else:
self.show_win()
os.kill(self.pid_child, signal.SIGCONT)
return 0
def run(self):
self.c2p = IPC()
self.pid_child = os.fork()
if self.pid_child < 0: # error
messages.error('Cannot run function')
return
elif self.pid_child == 0: # child
self.child_process(self.func, *self.args)
os._exit(0)
# parent
status = 0
while True:
if self.check_finish():
break
self.process_result()
status = self.check_keys()
if status == -100: # stopped by user
self.status = status
break
self.animate_cursor()
# finish and return
self.c2p.close()
try:
os.wait()
except OSError:
pass
self.end_gui()
if self.status == -100: # stopped by user
return -100, 'Stopped by user'
try:
st, buf = self.ret
except:
st, buf = 0, None
return st, buf
def child_process(self, func, *args):
res = func(*args)
self.c2p.send(res)
os._exit(0)
######################################################################
##### run_shell
# run command via shell and optionally return output, popen version
def run_shell_popen(cmd, path, return_output=False):
if not cmd:
return 0, ''
cmd = 'cd "%s" && %s' % (path, cmd)
p = popen2.Popen3(cmd, capturestderr=True)
p.tochild.close()
outfd, errfd = p.fromchild, p.childerr
output, error = [], []
while True:
# check if finished
(pid, status) = os.waitpid(p.pid, os.WNOHANG)
if pid > 0:
status = status >> 8
o = p.fromchild.readline()
while o: # get output before quit
o = o.strip()
if o:
output.append(o)
o = p.fromchild.readline()
e = p.childerr.readline()
while e: # get error before quit
e = e.strip()
if e:
error.append(e)
e = p.childerr.readline()
break
# check for output
ready = select.select([outfd, errfd], [], [], .01)
if outfd in ready[0]:
o = p.fromchild.readline()
if o:
output.append(o)
if errfd in ready[0]:
e = p.childerr.readline()
while e: # get the whole error message
e = e.strip()
if e:
error.append(e)
e = p.childerr.readline()
status = p.wait() >> 8
break
time.sleep(0.1) # extra time to update output in case execution is too
fast
# return
p.fromchild.close()
p.childerr.close()
if status != 0:
error.insert(0, 'Exit code: %d' % status)
return -1, '\n'.join(error)
if error != []:
return -1, '\n'.join(error)
if return_output:
return 0, '\n'.join(output)
else:
return 0, ''
# run in background, system version
def run_in_background_system(cmd, path):
pid = os.fork()
if pid == 0:
try:
maxfd = os.sysconf("SC_OPEN_MAX")
except (AttributeError, ValueError):
maxfd = 256 # default maximum
# os.closerange(0, maxfd) # python v2.6+
for fd in xrange(0, maxfd):
try:
os.close(fd)
except OSError: # ERROR (ignore)
pass
# Redirect the standard file descriptors to /dev/null.
os.open("/dev/null", os.O_RDONLY) # standard input (0)
os.open("/dev/null", os.O_RDWR) # standard output (1)
os.open("/dev/null", os.O_RDWR) # standard error (2)
os.system('cd "%s" && %s' % (path, cmd))
os._exit(0)
else:
pass # don't wait
# get output from a command run in shell, popen version
def get_shell_output_popen(cmd):
i, a = os.popen4(cmd)
buf = a.read()
i.close(), a.close()
return buf.strip()
# get output from a command run in shell, no stderr, popen version
def get_shell_output2_popen(cmd):
i, o, e = os.popen3(cmd)
buf = o.read()
i.close(), o.close(), e.close()
if buf:
return buf.strip()
else:
return ''
# get error from a command run in shell, popen version
def get_shell_output3_popen(cmd):
i, o, e = os.popen3(cmd)
buf = e.read()
i.close(), o.close(), e.close()
if buf:
return buf.strip()
else:
return ''
# run command via shell and optionally return output, subprocess version
def run_shell_subprocess(cmd, path, return_output=False):
if not cmd:
return 0, ''
p = Popen(cmd, cwd=path, shell=True,
stdin=None, stdout=PIPE, stderr=PIPE, close_fds=True)
while p.wait() is None:
time.sleep(0.2)
output, error = p.stdout.read(), p.stderr.read()
p.stdout.close(), p.stderr.close()
if p.returncode < 0:
error = 'Exit code: %d\n' % p.returncode + error
return -1, error
if error != '':
return -1, error
if return_output:
return 0, output
else:
return 0, ''
# run in background, subprocess version
def run_in_background_subprocess(cmd, path):
pid = os.fork()
if pid == 0:
p = Popen(cmd, cwd=path, shell=True, close_fds=True,
stdin=None, stdout=open('/dev/null', 'w'), stderr=STDOUT)
os._exit(0)
else:
pass # don't wait
# get output from a command run in shell, subprocess version
def get_shell_output_subprocess(cmd):
p = Popen(cmd, shell=True,
stdin=None, stdout=PIPE, stderr=STDOUT, close_fds=True)
while p.wait() is None:
time.sleep(0.1)
buf = p.stdout.read()
p.stdout.close()
return buf.strip() if buf else None
# get output from a command run in shell without stderr, subprocess version
def get_shell_output2_subprocess(cmd):
p = Popen(cmd, shell=True,
stdin=None, stdout=PIPE, stderr=PIPE, close_fds=True)
p.stderr.close()
while p.wait() is None:
time.sleep(0.1)
buf = p.stdout.read()
p.stdout.close()
return buf.strip() if buf else None
# get error from a command run in shell, subprocess version
def get_shell_output3_subprocess(cmd):
p = Popen(cmd, shell=True,
stdin=None, stdout=None, stderr=PIPE, close_fds=True)
while p.wait() is None:
time.sleep(0.1)
buf = p.stderr.read()
p.stderr.close()
return buf.strip() if buf else None
######################################################################
##### run_dettached
def run_dettached(prog, *args):
pid = os.fork()
if pid == 0:
os.setsid()
os.chdir('/')
try:
maxfd = os.sysconf("SC_OPEN_MAX")
except (AttributeError, ValueError):
maxfd = 256 # default maximum
# os.closerange(0, maxfd) # python v2.6+
for fd in xrange(0, maxfd):
try:
os.close(fd)
except OSError: # ERROR (ignore)
pass
# Redirect the standard file descriptors to /dev/null.
os.open("/dev/null", os.O_RDONLY) # standard input (0)
os.open("/dev/null", os.O_RDWR) # standard output (1)
os.open("/dev/null", os.O_RDWR) # standard error (2)
pid2 = os.fork()
if pid2 == 0:
os.execlp(prog, prog, *args)
else:
os.waitpid(-1, os.P_NOWAIT)
os._exit(0)
else:
os.wait()
######################################################################
##### un/compress(ed) files
# compress/uncompress file: gzip/gunzip, bzip2/bunzip2
def do_compress_uncompress_file(filename, path, typ):
if os.path.isabs(filename):
fullfile = filename
filename = os.path.basename(filename)
else:
fullfile = os.path.join(path, filename)
if not os.path.isfile(fullfile):
return -1, '%s: is not a file' % filename
c = compress.check_compressed_file(fullfile)
if c is None or isinstance(c, compress.PackagerTAR):
packager = compress.packagers_by_type[typ]
c = packager(fullfile)
cmd = c.build_compress_cmd()
elif c.type == typ:
cmd = c.build_uncompress_cmd()
else:
return -1, '%s: can\'t un/compress with %s' % \
(filename, compress.packagers_by_type[typ].compress_prog)
st, msg = run_shell(encode(cmd), encode(path), return_output=True)
return st, msg
def compress_uncompress_file(tab, typ):
if tab.selections:
fs = tab.selections[:]
else:
fs = [tab.sorted[tab.file_i]]
ProcessLoopUnCompress('Un/Compress file', do_compress_uncompress_file,
fs, tab.path, typ).run()
tab.selections = []
app.regenerate()
# uncompress directory
def do_uncompress_dir(filename, path, dest, is_tmp=False):
if os.path.isabs(filename):
fullfile = filename
filename = os.path.basename(filename)
else:
fullfile = os.path.join(path, filename)
if not os.path.isfile(fullfile):
return -1, '%s: is not a file' % filename
c = compress.check_compressed_file(fullfile)
if c is None:
return -1, '%s: can\'t uncompress' % filename
cmd = c.build_uncompress_cmd()
st, msg = run_shell(encode(cmd), encode(dest), return_output=True)
if st < 0: # (-100, -1),
# never reached if user stops (-100) because this process is killed
c.delete_uncompress_temp(dest, is_tmp)
return st, msg
def uncompress_dir(tab, dest=None, is_tmp=False):
"""uncompress tarred file in path directory"""
if dest is None:
dest = tab.path
if tab.selections:
fs = tab.selections[:]
else:
fs = [tab.sorted[tab.file_i]]
ProcessLoopUnCompress('Uncompress file', do_uncompress_dir,
fs, tab.path, dest, is_tmp).run()
tab.selections = []
# compress directory: tar and gzip, bzip2
def do_compress_dir(filename, path, typ, dest, is_tmp=False):
if os.path.isabs(filename):
fullfile = filename
filename = os.path.basename(filename)
else:
fullfile = os.path.join(path, filename)
if not os.path.isdir(fullfile):
return -1, '%s: is not a directory' % filename
c = compress.packagers_by_type[typ](fullfile)
if c is None:
return -1, '%s: can\'t compress' % filename
cmd = c.build_compress_cmd()
st, msg = run_shell(encode(cmd), encode(dest), return_output=True)
if st < 0: # (-100, -1):
# never reached if user stops (-100) because this process is killed
c.delete_compress_temp(dest, is_tmp)
return st, msg
def compress_dir(tab, typ, dest=None, is_tmp=False):
"""compress directory to current path"""
if dest is None:
dest = tab.path
if tab.selections:
fs = tab.selections[:]
else:
fs = [tab.sorted[tab.file_i]]
ProcessLoopUnCompress('Compress file', do_compress_dir,
fs, tab.path, typ, dest, is_tmp).run()
tab.selections = []
######################################################################
##### find / grep
# find/grep
def do_findgrep(path, files, pattern):
# escape special chars
pat_re = pattern.replace('\\', '\\\\\\\\').replace('-', '\\-')
pat_re = pat_re.replace('(', '\\(').replace(')', '\\)')
pat_re = pat_re.replace('[', '\\[').replace(']', '\\]')
ign = app.prefs.options['grep_ignorecase'] and 'i' or ''
rex = app.prefs.options['grep_regex'] and 'E' or ''
# 1. find . -type f -iname "*.py" -exec grep -EHni PATTERN {} \;
# the slowest, 10x
# 2. find . -type f -iname "*py" -print0 | xargs --null grep -EHni PATTERN
# maybe the best choice
cmd = '%s "%s" -type f -iname "%s" -print0 | %s --null %s -%sHn%s \"%s\"' %
\
(sysprogs['find'], path, files, sysprogs['xargs'], sysprogs['grep'],
rex, ign, pat_re)
# 3. grep -EHni PATTERN `find . -type f -iname "*.py"`
# don't like `
# 4. grep -REHni PATTERN --include "*.py" .
# the fastest, but non-POSIX, because of: -R, --include
# cmd = '%s -R%sHn%s \"%s\" --include "%s" "%s"' % \
# (sysprogs['grep'], rex, ign, pat_re, files, path)
st, ret = ProcessFunc('Searching',
'Searching for \"%s\" in \"%s\" files' % (pattern,
files),
run_shell, encode(cmd), path, True).run()
if not ret:
return 0, []
if st < 0: # (-100, -1) => error
return st, ret
elif st == 0:
ret = [f.strip() for f in ret.split('\n') if f.strip() != '']
matches = []
if len(ret) > 0:
# filename:linenumber:matching
# note that filename could contain ':', so we have to parse
for l in ret:
if not l:
continue
lst = l.split(':')
if len(lst) == 1: # binary file
linenumber = 0
filename = lst[0].split(' ')[-2] # FIXME: filename can contain
SPC
else:
i = len(lst) - 2
while True:
filename = decode(':'.join(lst[:i]))
if os.path.exists(filename):
break
else:
i -= 1
try:
linenumber = int(lst[i])
except ValueError:
linenumber = 0
filename = filename.replace(path, '')
if filename[0] == os.sep and path != os.sep:
filename = filename[1:]
matches.append((filename, linenumber))
matches = ['%s:%d' % (f, l) for f, l in sorted(matches)]
return 0, matches
# find
def do_find(path, files):
cmd = '%s %s -name \"%s\" -print' % (sysprogs['find'], path, files)
st, ret = ProcessFunc('Searching',
'Searching for \"%s\" files' % files,
run_shell, encode(cmd), path, True).run()
if not ret:
return 0, []
if st < 0: # (-100, -1) => error
return st, ret
elif st == 0:
ret = [f.strip() for f in ret.split('\n') if f.strip() != '']
matches = []
if len(ret) > 0:
for filename in ret:
filename = decode(filename).strip().replace(path, '')
if filename is not None and filename != '':
if filename[0] == os.sep and path != os.sep:
filename = filename[1:]
matches.append(filename)
return 0, sorted(matches)
######################################################################
##### encode/decode strings
def encode(buf):
return buf.encode(g_encoding)
def isASCII(text):
try:
text = unicode(text, 'ASCII', 'strict')
return True
except UnicodeDecodeError:
return False
# False indique que text contient des valeurs supérieures ou égales à 128
# True indique que text utilise (semble utiliser ?) le charset ASCII
def decode_old(text):
try:
text = unicode(text, 'ASCII', 'strict')
return True
except UnicodeDecodeError:
return False
def decode_oldy(buf):
try:
buf = unicode(buf, 'ASCII', 'strict')
return buf
except UnicodeDecodeError:
return False
def decode(buf):
if isinstance(buf, unicode):
return buf
for c in codecs_list:
try:
buf = buf.decode(c)
except UnicodeDecodeError:
buf = unicode("error.please.fix here it", "utf8")
continue
else:
return buf
else:
return buf.decode('ascii', 'replace')
def ask_convert_invalid_encoding_filename(filename):
auto = app.prefs.options['automatic_file_encoding_conversion']
if auto == -1:
return False
elif auto == 1:
return True
elif auto == 0:
ret = messages.confirm('Detected invalid encoding',
'In file <%s>, convert' % filename)
try:
app.display()
except:
pass
return (ret == 1)
else:
raise ValueError
######################################################################
##### useful functions
def get_escaped_filename(filename):
filename = filename.replace('$', '\\$')
if filename.find('"') != -1:
filename = filename.replace('"', '\\"')
return encode(filename)
def get_escaped_command(cmd, filename):
filename = filename.replace('$', '\$')
if filename.find('"') != -1:
filename = filename.replace('"', '\\"')
return '%s \'%s\'' % (encode(cmd), encode(filename))
else:
return '%s \"%s\"' % (encode(cmd), encode(filename))
def run_on_current_file(program, filename):
cmd = get_escaped_command(app.prefs.progs[program], filename)
curses.endwin()
os.system(cmd)
curses.curs_set(0)
######################################################################
if sys.version_info[:2] < (2, 4):
import popen2
run_shell = run_shell_popen
run_in_background = run_in_background_system
get_shell_output = get_shell_output_popen
get_shell_output2 = get_shell_output2_popen
get_shell_output3 = get_shell_output3_popen
else:
from subprocess import Popen, PIPE, STDOUT
run_in_background = run_in_background_subprocess
run_shell = run_shell_subprocess
get_shell_output = get_shell_output_subprocess
get_shell_output2 = get_shell_output2_subprocess
get_shell_output3 = get_shell_output3_subprocess
######################################################################
-- System Information:
Debian Release: 6.0.2
APT prefers stable-updates
APT policy: (500, 'stable-updates'), (500, 'stable')
Architecture: i386 (i686)
Kernel: Linux 2.6.32-5-686 (SMP w/2 CPU cores)
Locale: LANG=C, LC_CTYPE=C (charmap=ANSI_X3.4-1968)
Shell: /bin/sh linked to /bin/dash
Versions of packages lfm depends on:
ii python 2.6.6-3+squeeze6 interactive high-level object-orie
ii python-support 1.0.10 automated rebuilding support for P
lfm recommends no packages.
lfm suggests no packages.
-- no debconf information
--
To UNSUBSCRIBE, email to [email protected]
with a subject of "unsubscribe". Trouble? Contact [email protected]