...and a clean interface to boot.
So Lars might say where's the socketstream, but this has been a
real struggle and I'm now officially happy.
If anyone wants the wee test program that I used to develop this
then shout and I'll put it somewhere accessible.
Angus
// -*- C++ -*-
/**
* \file pipecomm.h
* This file is part of LyX, the document processor.
* Licence details can be found in the file COPYING.
*
* \author Alejandro Aguilar Sierra
* \author Angus Leeming
*
* Full author contact details are available in file CREDITS
*
* This class manages the named pipes used for communication between programs.
* Rewritten (simplified?) by Angus Leeming for use with pipestream.
*/
#ifndef PIPECOMM_H
#define PIPECOMM_H
#ifdef __GNUG__
#pragma interface
#endif
#include "fdmonitor.h"
#include <streambuf> // for ios_base::openmode
class PipeComm {
public:
///
enum Type {
///
SERVER,
///
CLIENT
};
///
PipeComm(Type);
///
virtual ~PipeComm();
/// Are we up and running?
bool is_open() const;
/** A wrapper for ::open()
* Returns \c true on success.
*/
virtual bool open(char const * pipename_base) = 0;
/** A wrapper for ::close().
* Returns \c true on success.
*/
bool close();
/** A wrapper for ::read().
* Returns the number of bytes read or -1 on error.
*/
int read(char *, size_t);
/** A wrapper for ::write().
* Returns the number of bytes written or -1 on error.
*/
int write(char const *, size_t);
/** Connect and you'll be informed when the input fd has received
* something new.
*/
typedef boost::function0<void> function_type;
///
function_type & input_received() const;
/// clean up in emergency
void cleanUp();
protected:
/// Open pipes.
void openConnection();
/// Close pipes.
void closeConnection();
/// This is -1 if not open.
int infd;
/// This is -1 if not open.
int outfd;
/// Base of pipename including path.
char const * basename_;
///
Type type_;
///
FDMonitor monitor_;
};
class PipeCommClient : public PipeComm {
public:
///
PipeCommClient();
///
virtual bool open(char const * pipename_base);
};
class PipeCommServer : public PipeComm {
public:
///
PipeCommServer();
///
virtual bool open(char const * pipename_base);
};
#endif // PIPECOMM_H
/**
* \file pipecomm.C
* This file is part of LyX, the document processor.
* Licence details can be found in the file COPYING.
*
* \author Alejandro Aguilar Sierra
* \author Angus Leeming
*
* Full author contact details are available in file CREDITS
*
* This class manages the named pipes used for communication between programs.
* Rewritten (simplified?) by Angus Leeming for use with pipestream.
*/
#include "config.h"
#ifdef __GNUG__
#pragma implementation
#endif
#include "pipecomm.h"
#include "debug.h"
#include "lyxalgo.h"
#include "support.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>
#include <boost/scoped_array.hpp>
#include <string>
using lyx::copy_if;
using std::endl;
using std::ios_base;
using std::string;
namespace {
std::string const inname(string const & base, bool server)
{
return base + (server ? ".in" : ".out");
}
std::string const outname(string const & base, bool server)
{
return base + (server ? ".out" : ".in");
}
bool pipeExists(string const & filename)
{
bool const exists = ::access(filename.c_str(), F_OK) == 0;
if (exists) {
lyxerr << "PipeComm: Pipe " << filename << " already exists.\n"
<< "If no other program using this pipe is active, "
"please delete it by hand and try again." << endl;
}
return exists;
}
bool createPipe(string const & filename)
{
bool const success = ::mkfifo(filename.c_str(), 0600) == 0;
if (!success) {
lyxerr << "PipeComm: Could not create pipe " << filename << '\n'
<< strerror(errno) << endl;
}
return success;
}
bool removePipe(string const & filename)
{
bool const success = lyx::unlink(filename) == 0;
if (!success) {
lyxerr << "PipeComm: Could not remove pipe " << filename << '\n'
<< strerror(errno) << endl;
}
return success;
}
int openPipe(string const & filename, bool write)
{
int fd = ::open(filename.c_str(),
write ? O_RDWR : (O_RDONLY|O_NONBLOCK));
if (fd < 0) {
fd = -1;
lyxerr << "PipeComm: Could not open pipe " << filename << '\n'
<< strerror(errno) << endl;
}
return fd;
}
bool closePipe(int fd, string const & filename)
{
if (fd < 0)
return true;
bool const success = ::close(fd) == 0;
if (!success) {
lyxerr << "PipeComm: Could not close pipe " << filename << '\n'
<< strerror(errno)
<< endl;
}
return success;
}
bool setNonblocking(int fd, string const & filename)
{
bool const success = fcntl(fd, F_SETFL, O_NONBLOCK) != -1;
if (!success) {
lyxerr << "PipeComm: Could not set non-blocking status for "
<< filename << '\n'
<< strerror(errno) << endl;
}
return success;
}
} // namespace anon
PipeComm::PipeComm(Type type)
: infd(-1),
outfd(-1),
basename_(0),
type_(type)
{}
PipeComm::~PipeComm()
{
cleanUp();
}
bool PipeComm::is_open() const
{
return infd != -1 && outfd != -1;
}
bool PipeComm::close()
{
if (!is_open())
return false;
closeConnection();
return !is_open();
}
PipeComm::function_type & PipeComm::input_received() const
{
return monitor_.input_received();
}
namespace {
struct UnixCopy {
bool operator()(char c) { return c != '\r'; }
};
} // namespace anon
int PipeComm::read(char * buffer, size_t buffersize)
{
if (!is_open()) {
lyxerr << "PipeComm: Pipes are closed. Cannot read." << endl;
return 0;
}
if (!buffersize) {
lyxerr[Debug::LYXSERVER]
<< "PipeComm: Request to read 0 bytes ignored."
<< endl;
return 0;
}
boost::scoped_array<char> tmpbuf(new char[buffersize]);
int const status = ::read(infd, tmpbuf.get(), buffersize - 1);
if (status > 0) {
tmpbuf[status]= '\0'; // turn it into a c string
char * begin = tmpbuf.get();
char * end = begin + status;
copy_if(begin, end, buffer, UnixCopy());
return int(::strlen(buffer));
}
if (errno != EAGAIN) {
lyxerr << "PipeComm: truncated command. Resetting connection"
<< endl;
// reset connection
closeConnection();
openConnection();
}
return 0;
}
int PipeComm::write(char const * cmsg, size_t csize)
{
if (!is_open()) {
lyxerr << "PipeComm: Pipes are closed. Cannot write." << endl;
return -1;
}
if (!csize) {
lyxerr[Debug::LYXSERVER]
<< "PipeComm: Request to write empty string ignored."
<< endl;
return 0;
}
int const nbytes = ::write(outfd, cmsg, csize);
if (nbytes < 0) {
lyxerr[Debug::LYXSERVER]
<< "PipeComm: Error writing message: "
<< string(cmsg, csize)
<< '\n' << strerror(errno)
<< "\nPipeComm: Resetting connection" << endl;
closeConnection();
openConnection();
}
return nbytes;
}
void PipeComm::cleanUp()
{
if (!basename_) {
return;
}
lyxerr[Debug::LYXSERVER] << "PipeComm: cleaning up!" << endl;
string const in = inname(basename_, type_ == SERVER);
string const out = outname(basename_, type_ == SERVER);
if (infd != -1) {
closePipe(infd, in);
infd = -1;
}
if (outfd != -1) {
closePipe(outfd, out);
outfd = -1;
}
if (type_ == SERVER) {
removePipe(in);
removePipe(out);
}
basename_ = 0;
}
void PipeComm::openConnection()
{
lyxerr[Debug::LYXSERVER] << "PipeComm: Opening connection." << endl;
if (monitor_.running())
monitor_.stop();
// If we are up, that's an error
if (is_open()) {
lyxerr << "PipeComm: Already connected." << endl;
return;
}
if (!basename_) {
lyxerr[Debug::LYXSERVER]
<< "PipeComm: Server is disabled, nothing to do."
<< endl;
return;
}
string const in = inname(basename_, type_ == SERVER);
if ((infd = openPipe(in, false)) == -1) {
cleanUp();
return;
}
string const out = outname(basename_, type_ == SERVER);
if ((outfd = openPipe(out, true)) == -1) {
cleanUp();
return;
}
if (!setNonblocking(outfd, out)) {
cleanUp();
return;
}
monitor_.start(inname(basename_, type_ == SERVER), infd);
lyxerr[Debug::LYXSERVER]
<< "PipeComm: Connection established." << endl;
}
void PipeComm::closeConnection()
{
lyxerr[Debug::LYXSERVER] << "PipeComm: Closing connection." << endl;
if (monitor_.running())
monitor_.stop();
if (!basename_) {
lyxerr[Debug::LYXSERVER]
<< "PipeComm: Server is disabled, nothing to do."
<< endl;
return;
}
if (!is_open()) {
lyxerr[Debug::LYXSERVER]
<< "PipeComm: Already disconnected." << endl;
return;
}
closePipe(infd, inname(basename_, type_ == SERVER));
closePipe(outfd, outname(basename_, type_ == SERVER));
infd = -1;
outfd = -1;
}
PipeCommClient::PipeCommClient()
: PipeComm(CLIENT)
{}
bool PipeCommClient::open(char const * pipename_base)
{
if (is_open())
return false;
basename_ = pipename_base;
openConnection();
return is_open();
}
PipeCommServer::PipeCommServer()
: PipeComm(SERVER)
{}
bool PipeCommServer::open(char const * pipename_base)
{
if (is_open())
return false;
basename_ = pipename_base;
string const in = inname(basename_, true);
string const out = outname(basename_, true);
if (pipeExists(in) || pipeExists(out)) {
basename_ = 0;
return false;
}
if (!createPipe(in)) {
basename_ = 0;
return false;
}
if (!createPipe(out)) {
basename_ = 0;
removePipe(in);
return false;
}
openConnection();
return is_open();
}
// -*- C++ -*-
/**
* \file pipestream.h
* This file is part of LyX, the document processor.
* Licence details can be found in the file COPYING.
*
* \author Angus Leeming
*
* Full author contact details are available in file CREDITS
*
* This code has been adapted from pipestream.C by Gnanasekaran Swaminathan.
* Below is the original copyright notice:
*
* Copyright (C) 1992,1993,1994 Gnanasekaran Swaminathan <[EMAIL PROTECTED]>
*
* Permission is granted to use at your own risk and distribute this software
* in source and binary forms provided the above copyright
* notice and this paragraph are preserved on all copies.
* This software is provided "as is" with no express or implied warranty.
*/
#ifndef PIPESTREAM_H
#define PIPESTREAM_H
#ifdef __GNUG__
#pragma interface
#endif
#include "pipestreamfwd.h"
#include <boost/function/function0.hpp>
#include <cerrno>
#include <streambuf>
#include <iostream>
template<typename commT, typename charT, typename traitsT>
class basic_pipebuf : public std::basic_streambuf<charT, traitsT> {
public:
///
typedef commT comm_type;
typedef charT char_type;
typedef traitsT traits_type;
typedef basic_pipebuf<comm_type, char_type, traits_type> buffer_type;
///
basic_pipebuf();
///
virtual ~basic_pipebuf();
///
bool is_open() const { return comm_.is_open(); }
///
buffer_type * open(char const * basename);
///
buffer_type * close();
/** Connect and you'll be informed when the input fd has received
* something new.
*/
typedef boost::function0<void> function_type;
function_type & input_received() const;
protected:
///
virtual int_type underflow();
///
virtual int_type overflow(int_type c = traits_type::eof());
// /** Tru64 Unix and g++-v3 both have a basic_streambuf::xsputn that does
// * the job perfectly.
// */
// virtual std::streamsize xsputn(char_type const * s, std::streamsize n);
///
virtual int_type sync();
private:
///
bool flush();
///
static int const bufsize_ = 1024;
/// get area
char_type gbuf_[bufsize_];
/// put area
char_type pbuf_[bufsize_];
/// the beast doing all the opening, closing, reading and writing
commT comm_;
};
template<typename bufferT, typename charT, typename traitsT>
class basic_pipestream : public std::basic_iostream<charT, traitsT> {
public:
/// Types
typedef charT char_type;
typedef traitsT traits_type;
typedef bufferT buffer_type;
typedef typename traits_type::int_type int_type;
typedef typename traits_type::pos_type pos_type;
typedef typename traits_type::off_type off_type;
typedef std::basic_iostream<char_type, traits_type> iostream_type;
private:
buffer_type buffer_;
public:
///
basic_pipestream()
: iostream_type(0)
{
init(&buffer_);
}
///
explicit basic_pipestream(char const * pipename)
: iostream_type(0)
{
init(&buffer_);
open(pipename);
}
///
buffer_type * rdbuf() const
{
return const_cast<buffer_type *>(&buffer_);
}
///
bool is_open() const
{
return buffer_.is_open();
}
///
void open(char const * pipename)
{
if (!buffer_.open(pipename))
setstate(std::ios_base::failbit);
}
///
void close()
{
if (!buffer_.close())
setstate(std::ios_base::failbit);
}
/** Connect and you'll be informed when the input fd has received
* something new.
*/
typedef boost::function0<void> function_type;
///
function_type & input_received() const
{
return buffer_.input_received();
}
};
namespace {
void print_error(const char * msg)
{
if (errno)
perror(msg);
errno = 0;
}
} // namespace anon
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::basic_pipebuf()
{
setg(gbuf_, gbuf_, gbuf_);
setp(pbuf_, pbuf_ + bufsize_);
}
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::~basic_pipebuf()
{
overflow();
comm_.close();
}
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::buffer_type *
basic_pipebuf<commT, charT, traitsT>::open(char const * basename)
{
if (comm_.is_open() || !comm_.open(basename))
return 0;
return this;
}
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::buffer_type *
basic_pipebuf<commT, charT, traitsT>::close()
{
if (!comm_.is_open() || !comm_.close())
return 0;
return this;
}
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::function_type &
basic_pipebuf<commT, charT, traitsT>::input_received() const
{
return comm_.input_received();
}
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::int_type
basic_pipebuf<commT, charT, traitsT>::underflow()
{
if (gptr() < egptr())
return gptr() != 0;
std::streamsize const n = comm_.read(gbuf_, bufsize_);
if (n == 0)
return traits_type::eof();
if (n == -1)
print_error("read");
// Reset egptr() to eback() + n
setg(eback(), eback(), eback() + n);
return gptr() != 0;
}
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::int_type
basic_pipebuf<commT, charT, traitsT>::overflow(int_type c)
{
if (c == traits_type::eof())
return flush() ? 0 : c;
if (pptr() >= epptr())
if (!flush())
return traits_type::eof();
sputc(traits_type::to_char_type(c));
if (c == '\n' || pptr() >= epptr())
if (!flush())
return traits_type::eof();
return c;
}
template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::int_type
basic_pipebuf<commT, charT, traitsT>::sync()
{
return flush() ? 0 : -1;
}
template<typename commT, typename charT, typename traitsT>
bool
basic_pipebuf<commT, charT, traitsT>::flush()
{
if (pptr() == pbase())
// Nothing to flush
return true;
std::streamsize n;
for (std::streamsize len = pptr() - pbase(); len > 0; len -= n) {
n = comm_.write(pbase(), len);
if (n == -1) {
print_error("write");
setp(pbase(), epptr());
return false;
}
}
// Reset the current pointer to pbase()
setp(pbase(), epptr());
return true;
}
// template<typename commT, typename charT, typename traitsT>
// std::streamsize
// basic_pipebuf<commT, charT, traitsT>::xsputn(char_type const * p,
// std::streamsize n)
// {
// std::cerr << "basic_pipebuf::xsputn()" << std::endl;
// if (n <= 0)
// return 0;
// for (std::streamsize i = 0; i < n; i++, p++)
// if (traits_type::eof() ==
// ((*p == '\n') ? overflow(*p) : sputc(*p)))
// return i;
// return n;
// }
#include "pipecomm.h"
#endif // PIPESTREAM_H
// -*- C++ -*-
/**
* \file pipestreamfwd.h
* This file is part of LyX, the document processor.
* Licence details can be found in the file COPYING.
*
* \author Angus Leeming
*
* Full author contact details are available in file CREDITS
*/
#ifndef PIPESTREAMFWD_H
#define PIPESTREAMFWD_H
#ifdef __GNUG__
#pragma interface
#endif
#include <iosfwd>
template<typename commT,
typename charT, typename traitsT = std::char_traits<charT> >
class basic_pipebuf;
template<typename Buffer,
typename charT, typename traitsT = std::char_traits<charT> >
class basic_pipestream;
class PipeCommClient;
class PipeCommServer;
typedef basic_pipebuf<PipeCommClient, char> pipebuf_client;
typedef basic_pipebuf<PipeCommServer, char> pipebuf_server;
typedef basic_pipestream<pipebuf_client, char> pipestream_client;
typedef basic_pipestream<pipebuf_client, char> pipestream_server;
typedef basic_pipebuf<PipeCommClient, wchar_t> wpipebuf_client;
typedef basic_pipebuf<PipeCommServer, wchar_t> wpipebuf_server;
typedef basic_pipestream<pipebuf_client, wchar_t> wpipestream_client;
typedef basic_pipestream<pipebuf_client, wchar_t> wpipestream_server;
#endif // PIPESTREAMFWD_H
// -*- C++ -*-
#ifndef FDMONITOR_H
#define FDMONITOR_H
#ifdef __GNUG__
#pragma interface
#endif
#include <boost/scoped_ptr.hpp>
#include <boost/function/function0.hpp>
#include <string>
class FDMonitor {
public:
///
FDMonitor();
/// Define an empty d-tor out of line to keep boost::scoped_ptr happy.
~FDMonitor();
/** One FDMonitor instance can monitor only one file device at a
* time.
*/
void start(std::string const & file, int fd) const;
///
void stop() const;
///
bool running() const;
/** connect and you'll be informed when the file device is ready for
* reading.
*/
typedef boost::function0<void> function_type;
///
function_type & input_received() const;
/** The implementation is gui-specific, so use the Pimpl idiom to give
* the different frontends complete flexibility.
*/
class Impl;
private:
///
boost::scoped_ptr<Impl> const pimpl_;
};
#endif // FDMONITOR_H
#include "config.h"
#ifdef __GNUG__
#pragma implementation
#endif
#include "fdmonitor.h"
#include "FileInfo.h"
#include <boost/signals/trackable.hpp>
#include FORMS_H_LOCATION
using std::string;
struct FDMonitor::Impl : public boost::signals::trackable {
///
Impl() : fd_(-1), timestamp_(0) {}
///
void start(string const & file, int fd);
///
void stop();
///
void checkstatus();
///
boost::function0<void> fd_changed_;
///
string file_;
///
int fd_;
/// We use this to ascertain whether the file has really changed.
time_t timestamp_;
};
FDMonitor::FDMonitor()
: pimpl_(new Impl)
{}
FDMonitor::~FDMonitor()
{
pimpl_->stop();
}
void FDMonitor::start(string const & file, int fd) const
{
pimpl_->start(file, fd);
}
void FDMonitor::stop() const
{
pimpl_->stop();
}
bool FDMonitor::running() const
{
return pimpl_->fd_ != -1;
}
FDMonitor::function_type & FDMonitor::input_received() const
{
return pimpl_->fd_changed_;
}
namespace {
extern "C"
void C_read_callback(int, void * data)
{
FDMonitor::Impl * ptr = static_cast<FDMonitor::Impl *>(data);
ptr->checkstatus();
}
}
void FDMonitor::Impl::start(string const & file, int fd)
{
if (fd == -1)
return;
stop();
FileInfo finfo(file);
if (!finfo.isOK())
return;
fd_ = fd;
fl_add_io_callback(fd_, FL_READ, C_read_callback, this);
// Set this last, to avoid spurious callbacks from checkstatus
file_ = file;
}
void FDMonitor::Impl::stop()
{
if (fd_ != -1)
fl_remove_io_callback(fd_, FL_READ, C_read_callback);
file_.erase();
timestamp_ = 0;
fd_ = -1;
}
void FDMonitor::Impl::checkstatus()
{
if (file_.empty())
return;
bool changed = false;
FileInfo finfo(file_);
if (!finfo.isOK()) {
changed = timestamp_;
timestamp_ = 0;
} else {
time_t const new_timestamp = finfo.getModificationTime();
changed = new_timestamp != timestamp_;
if (changed)
timestamp_ = new_timestamp;
}
if (changed)
fd_changed_();
}