...and a clean interface to boot.

So Lars might say where's the socketstream, but this has been a 
real struggle and I'm now officially happy.

If anyone wants the wee test program that I used to develop this 
then shout and I'll put it somewhere accessible.

Angus
// -*- C++ -*-
/**
 * \file pipecomm.h
 * This file is part of LyX, the document processor.
 * Licence details can be found in the file COPYING.
 *
 * \author Alejandro Aguilar Sierra
 * \author Angus Leeming
 *
 * Full author contact details are available in file CREDITS
 *
 * This class manages the named pipes used for communication between programs.
 * Rewritten (simplified?) by Angus Leeming for use with pipestream.
 */

#ifndef PIPECOMM_H
#define PIPECOMM_H

#ifdef __GNUG__
#pragma interface
#endif

#include "fdmonitor.h"
#include <streambuf> // for ios_base::openmode

class PipeComm {
public:
	///
	enum Type {
		///
		SERVER,
		///
		CLIENT
	};

	///
	PipeComm(Type);
	///
	virtual ~PipeComm();

	/// Are we up and running?
	bool is_open() const;

	/** A wrapper for ::open()
	 *  Returns \c true on success.
	 */
	virtual bool open(char const * pipename_base) = 0;
	/** A wrapper for ::close().
	 *  Returns \c true on success.
	 */
	bool close();
	/** A wrapper for ::read().
	 *  Returns the number of bytes read or -1 on error.
	 */
	int read(char *, size_t);
	/** A wrapper for ::write().
	 *  Returns the number of bytes written or -1 on error.
	 */
	int write(char const *, size_t);

	/** Connect and you'll be informed when the input fd has received
	 *  something new.
	 */
	typedef boost::function0<void> function_type;
	///
	function_type & input_received() const;

	/// clean up in emergency
	void cleanUp();

protected:
	/// Open pipes.
	void openConnection();

	/// Close pipes.
	void closeConnection();

	/// This is -1 if not open.
	int infd;

	/// This is -1 if not open.
	int outfd;

	/// Base of pipename including path.
	char const * basename_;

	///
	Type type_;

	///
	FDMonitor monitor_;
};


class PipeCommClient : public PipeComm {
public:
	///
	PipeCommClient();
	///
	virtual bool open(char const * pipename_base);
};


class PipeCommServer : public PipeComm {
public:
	///
	PipeCommServer();
	///
	virtual bool open(char const * pipename_base);
};


#endif // PIPECOMM_H
/**
 * \file pipecomm.C
 * This file is part of LyX, the document processor.
 * Licence details can be found in the file COPYING.
 *
 * \author Alejandro Aguilar Sierra
 * \author Angus Leeming
 *
 * Full author contact details are available in file CREDITS
 *
 * This class manages the named pipes used for communication between programs.
 * Rewritten (simplified?) by Angus Leeming for use with pipestream.
 */

#include "config.h"

#ifdef __GNUG__
#pragma implementation
#endif

#include "pipecomm.h"
#include "debug.h"
#include "lyxalgo.h"
#include "support.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>

#include <boost/scoped_array.hpp>
#include <string>

using lyx::copy_if;

using std::endl;
using std::ios_base;
using std::string;

namespace {

std::string const inname(string const & base, bool server)
{
	return base + (server ? ".in" : ".out");
}


std::string const outname(string const & base, bool server)
{
	return base + (server ? ".out" : ".in");
}


bool pipeExists(string const & filename)
{
	bool const exists = ::access(filename.c_str(), F_OK) == 0;

	if (exists) {
		lyxerr << "PipeComm: Pipe " << filename << " already exists.\n"
		       << "If no other program using this pipe is active, "
			"please delete it by hand and try again." << endl;
	}

	return exists;
}


bool createPipe(string const & filename)
{
	bool const success = ::mkfifo(filename.c_str(), 0600) == 0;

	if (!success) {
		lyxerr << "PipeComm: Could not create pipe " << filename << '\n'
		       << strerror(errno) << endl;
	}

	return success;
}


bool removePipe(string const & filename)
{
	bool const success = lyx::unlink(filename) == 0;

	if (!success) {
		lyxerr << "PipeComm: Could not remove pipe " << filename << '\n'
		       << strerror(errno) << endl;
	}

	return success;
}


int openPipe(string const & filename, bool write)
{
	int fd = ::open(filename.c_str(), 
			write ? O_RDWR : (O_RDONLY|O_NONBLOCK));

	if (fd < 0) {
		fd = -1;
		lyxerr << "PipeComm: Could not open pipe " << filename << '\n'
		       << strerror(errno) << endl;
	}

	return fd;
}


bool closePipe(int fd, string const & filename)
{
	if (fd < 0)
		return true;

	bool const success = ::close(fd) == 0;

	if (!success) {
		lyxerr << "PipeComm: Could not close pipe " << filename << '\n'
		       << strerror(errno)
		       << endl;
	}

	return success;
}


bool setNonblocking(int fd, string const & filename)
{
	bool const success = fcntl(fd, F_SETFL, O_NONBLOCK) != -1;

	if (!success) {
		lyxerr << "PipeComm: Could not set non-blocking status for "
		       << filename << '\n'
		       << strerror(errno) << endl;
	}

	return success;
}

} // namespace anon


PipeComm::PipeComm(Type type)
	: infd(-1),
	  outfd(-1),
	  basename_(0),
	  type_(type)
{}


PipeComm::~PipeComm()
{
	cleanUp();
}


bool PipeComm::is_open() const
{
	return infd != -1 && outfd != -1;
}


bool PipeComm::close()
{
	if (!is_open())
		return false;
	closeConnection();
	return !is_open();
}


PipeComm::function_type & PipeComm::input_received() const
{
	return monitor_.input_received();
}


namespace {

struct UnixCopy {
	bool operator()(char c) { return c != '\r'; }
};
  
} // namespace anon


int PipeComm::read(char * buffer, size_t buffersize)
{
	if (!is_open()) {
		lyxerr << "PipeComm: Pipes are closed. Cannot read." << endl;
		return 0;
	}

	if (!buffersize) {
		lyxerr[Debug::LYXSERVER]
			<< "PipeComm: Request to read 0 bytes ignored."
			<< endl;
		return 0;
	}

	boost::scoped_array<char> tmpbuf(new char[buffersize]);
	int const status = ::read(infd, tmpbuf.get(), buffersize - 1);

	if (status > 0) {
		tmpbuf[status]= '\0'; // turn it into a c string
		char * begin = tmpbuf.get();
		char * end   = begin + status;
		copy_if(begin, end, buffer, UnixCopy());
		return int(::strlen(buffer));
	}

	if (errno != EAGAIN) {
		lyxerr << "PipeComm: truncated command. Resetting connection"
		       << endl;

		// reset connection			
		closeConnection();
		openConnection();
	}

	return 0;
}


int PipeComm::write(char const * cmsg, size_t csize)
{
	if (!is_open()) {
		lyxerr << "PipeComm: Pipes are closed. Cannot write." << endl;
		return -1;
	}

	if (!csize) {
		lyxerr[Debug::LYXSERVER]
			<< "PipeComm: Request to write empty string ignored."
			<< endl;
		return 0;
	}

	int const nbytes = ::write(outfd, cmsg, csize);
	if (nbytes < 0) {
		lyxerr[Debug::LYXSERVER]
			<< "PipeComm: Error writing message: "
			<< string(cmsg, csize)
			<< '\n' << strerror(errno)
			<< "\nPipeComm: Resetting connection" << endl;

		closeConnection();
		openConnection();
	}

	return nbytes;
}


void PipeComm::cleanUp()
{
	if (!basename_) {
		return;
	}

	lyxerr[Debug::LYXSERVER] << "PipeComm: cleaning up!" << endl;

	string const in  = inname(basename_, type_ == SERVER);
	string const out = outname(basename_, type_ == SERVER);

	if (infd != -1) {
		closePipe(infd, in);
		infd = -1;
	}

	if (outfd != -1) {
		closePipe(outfd, out);
		outfd = -1;
	}

	if (type_ == SERVER) {
		removePipe(in);
		removePipe(out);
	}
	
	basename_ = 0;
}


void PipeComm::openConnection()
{
	lyxerr[Debug::LYXSERVER] << "PipeComm: Opening connection." << endl;

	if (monitor_.running())
		monitor_.stop();

	// If we are up, that's an error
	if (is_open()) {
		lyxerr << "PipeComm: Already connected." << endl;
		return;
	}

	if (!basename_) {
		lyxerr[Debug::LYXSERVER]
			<< "PipeComm: Server is disabled, nothing to do."
			<< endl;
		return;
	}

	string const in  = inname(basename_, type_ == SERVER);

	if ((infd = openPipe(in, false)) == -1) {
		cleanUp();
		return;
	}

	string const out = outname(basename_, type_ == SERVER);

	if ((outfd = openPipe(out, true)) == -1) {
		cleanUp();
		return;
	}

	if (!setNonblocking(outfd, out)) {
		cleanUp();
		return;
	}

	monitor_.start(inname(basename_, type_ == SERVER), infd);

	lyxerr[Debug::LYXSERVER]
		<< "PipeComm: Connection established." << endl;
}


void PipeComm::closeConnection()
{
	lyxerr[Debug::LYXSERVER] << "PipeComm: Closing connection." << endl;

	if (monitor_.running())
		monitor_.stop();

	if (!basename_) {
		lyxerr[Debug::LYXSERVER]
			<< "PipeComm: Server is disabled, nothing to do."
			<< endl;
		return;
	}

	if (!is_open()) {
		lyxerr[Debug::LYXSERVER]
			<< "PipeComm: Already disconnected." << endl;
		return;
	}

	closePipe(infd,  inname(basename_, type_ == SERVER));
	closePipe(outfd, outname(basename_, type_ == SERVER));

	infd  = -1;
	outfd = -1;
}


PipeCommClient::PipeCommClient()
	: PipeComm(CLIENT)
{}


bool PipeCommClient::open(char const * pipename_base)
{
	if (is_open())
		return false;

	basename_ = pipename_base;
	openConnection();
	return is_open();
}


PipeCommServer::PipeCommServer()
	: PipeComm(SERVER)
{}


bool PipeCommServer::open(char const * pipename_base)
{
	if (is_open())
		return false;

	basename_ = pipename_base;

	string const in  = inname(basename_, true);
	string const out = outname(basename_, true);

	if (pipeExists(in) || pipeExists(out)) {
		basename_ = 0;
		return false;
	}

	if (!createPipe(in)) {
		basename_ = 0;
		return false;
	}

	if (!createPipe(out)) {
		basename_ = 0;
		removePipe(in);
		return false;
	}

	openConnection();
	return is_open();
}
// -*- C++ -*-
/**
 * \file pipestream.h
 * This file is part of LyX, the document processor.
 * Licence details can be found in the file COPYING.
 *
 * \author Angus Leeming
 *
 * Full author contact details are available in file CREDITS
 *
 * This code has been adapted from pipestream.C by Gnanasekaran Swaminathan.
 * Below is the original copyright notice:
 *
 * Copyright (C) 1992,1993,1994 Gnanasekaran Swaminathan <[EMAIL PROTECTED]>
 *
 * Permission is granted to use at your own risk and distribute this software
 * in source and binary forms provided the above copyright
 * notice and this paragraph are preserved on all copies.
 * This software is provided "as is" with no express or implied warranty.
 */

#ifndef PIPESTREAM_H
#define PIPESTREAM_H

#ifdef __GNUG__
#pragma interface
#endif

#include "pipestreamfwd.h"

#include <boost/function/function0.hpp>

#include <cerrno>
#include <streambuf>
#include <iostream>

template<typename commT, typename charT, typename traitsT>
class basic_pipebuf : public std::basic_streambuf<charT, traitsT> {
public:
	///
	typedef commT    comm_type;
	typedef charT    char_type;
	typedef traitsT  traits_type;
   	typedef basic_pipebuf<comm_type, char_type, traits_type> buffer_type;
   
	///
	basic_pipebuf();
	///
	virtual ~basic_pipebuf();

	///
	bool is_open() const { return comm_.is_open(); }
	///
	buffer_type * open(char const * basename);
	///
	buffer_type * close();

	/** Connect and you'll be informed when the input fd has received
	 *  something new.
	 */
	typedef boost::function0<void> function_type;
	function_type & input_received() const;

protected:
	///
	virtual int_type underflow();
	///
	virtual int_type overflow(int_type c = traits_type::eof());
//  	/** Tru64 Unix and g++-v3 both have a basic_streambuf::xsputn that does
//  	 *  the job perfectly.
//  	 */
//  	virtual std::streamsize xsputn(char_type const * s, std::streamsize n);
	///
	virtual int_type sync();

private:
	///
	bool flush();
	///
	static int const bufsize_ = 1024;
	/// get area
	char_type gbuf_[bufsize_];
	/// put area
	char_type pbuf_[bufsize_];
	/// the beast doing all the opening, closing, reading and writing
	commT comm_;
};


template<typename bufferT, typename charT, typename traitsT>
class basic_pipestream : public std::basic_iostream<charT, traitsT> {
public:
	/// Types
	typedef charT                           char_type;
	typedef traitsT                         traits_type;
	typedef bufferT                         buffer_type;
	typedef typename traits_type::int_type  int_type;
	typedef typename traits_type::pos_type  pos_type;
	typedef typename traits_type::off_type  off_type;

	typedef std::basic_iostream<char_type, traits_type>  iostream_type;
private:
	buffer_type buffer_;

public:
	///
	basic_pipestream()
		: iostream_type(0)
	{
		init(&buffer_);
	}
	///
	explicit basic_pipestream(char const * pipename)
		: iostream_type(0)
	{
		init(&buffer_);
		open(pipename);
	}

	///
	buffer_type * rdbuf() const
	{
		return const_cast<buffer_type *>(&buffer_);
	}
	///
	bool is_open() const
	{
		return buffer_.is_open();
	}
	///
	void open(char const * pipename)
	{ 
		if (!buffer_.open(pipename))
			setstate(std::ios_base::failbit); 
	}
	///
	void close()
	{ 
		if (!buffer_.close())
	  		setstate(std::ios_base::failbit);	
	}
	/** Connect and you'll be informed when the input fd has received
	 *  something new.
	 */
	typedef boost::function0<void> function_type;
	///
	function_type & input_received() const
	{
		return buffer_.input_received();
	}
};


namespace {

void print_error(const char * msg)
{
	if (errno)
		perror(msg);
	errno = 0;
}

} // namespace anon


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::basic_pipebuf()
{
	setg(gbuf_, gbuf_, gbuf_);
	setp(pbuf_, pbuf_ + bufsize_);
}


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::~basic_pipebuf()
{
	overflow();
	comm_.close();
}


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::buffer_type *
basic_pipebuf<commT, charT, traitsT>::open(char const * basename)
{
	if (comm_.is_open() || !comm_.open(basename))
		return 0;
	return this;
}


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::buffer_type *
basic_pipebuf<commT, charT, traitsT>::close()
{
	if (!comm_.is_open() || !comm_.close())
		return 0;
	return this;
}


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::function_type &
basic_pipebuf<commT, charT, traitsT>::input_received() const
{
	return comm_.input_received();
}


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::int_type
basic_pipebuf<commT, charT, traitsT>::underflow()
{
	if (gptr() < egptr())
		return gptr() != 0;

	std::streamsize const n = comm_.read(gbuf_, bufsize_);
	if (n == 0)
  		return traits_type::eof();
	if (n == -1)
		print_error("read");

	// Reset egptr() to eback() + n
	setg(eback(), eback(), eback() + n);
	return gptr() != 0;
}


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::int_type
basic_pipebuf<commT, charT, traitsT>::overflow(int_type c)
{
	if (c == traits_type::eof())
		return flush() ? 0 : c;

	if (pptr() >= epptr())
		if (!flush())
			return traits_type::eof();

	sputc(traits_type::to_char_type(c));

	if (c == '\n' || pptr() >= epptr())
		if (!flush())
			return traits_type::eof();
	return c;
}


template<typename commT, typename charT, typename traitsT>
basic_pipebuf<commT, charT, traitsT>::int_type
basic_pipebuf<commT, charT, traitsT>::sync()
{
	return flush() ? 0 : -1;
}
		

template<typename commT, typename charT, typename traitsT>
bool
basic_pipebuf<commT, charT, traitsT>::flush()
{
	if (pptr() == pbase())
		// Nothing to flush
		return true;

	std::streamsize n;
	for (std::streamsize len = pptr() - pbase(); len > 0; len -= n) {
		n = comm_.write(pbase(), len);
		if (n == -1) {
			print_error("write");
			setp(pbase(), epptr());
			return false;
		}
	}

	// Reset the current pointer to pbase()
	setp(pbase(), epptr());
	return true;
}


//  template<typename commT, typename charT, typename traitsT>
//  std::streamsize
//  basic_pipebuf<commT, charT, traitsT>::xsputn(char_type const * p,
//  					   std::streamsize n)
//  {
//  	std::cerr << "basic_pipebuf::xsputn()" << std::endl;
//  	if (n <= 0)
//  		return 0;
//  	for (std::streamsize i = 0; i < n; i++, p++)
//  		if (traits_type::eof() ==
//  		    ((*p == '\n') ? overflow(*p) : sputc(*p)))
//  			return i;
//  	return n;
//  }

#include "pipecomm.h"

#endif // PIPESTREAM_H
// -*- C++ -*-
/**
 * \file pipestreamfwd.h
 * This file is part of LyX, the document processor.
 * Licence details can be found in the file COPYING.
 *
 * \author Angus Leeming
 *
 * Full author contact details are available in file CREDITS
 */

#ifndef PIPESTREAMFWD_H
#define PIPESTREAMFWD_H

#ifdef __GNUG__
#pragma interface
#endif

#include <iosfwd>

template<typename commT,
	typename charT, typename traitsT = std::char_traits<charT> >
class basic_pipebuf;

template<typename Buffer,
	typename charT, typename traitsT = std::char_traits<charT> >
class basic_pipestream;

class PipeCommClient;
class PipeCommServer;

typedef basic_pipebuf<PipeCommClient, char> pipebuf_client;
typedef basic_pipebuf<PipeCommServer, char> pipebuf_server;

typedef basic_pipestream<pipebuf_client, char> pipestream_client;
typedef basic_pipestream<pipebuf_client, char> pipestream_server;

typedef basic_pipebuf<PipeCommClient, wchar_t> wpipebuf_client;
typedef basic_pipebuf<PipeCommServer, wchar_t> wpipebuf_server;

typedef basic_pipestream<pipebuf_client, wchar_t> wpipestream_client;
typedef basic_pipestream<pipebuf_client, wchar_t> wpipestream_server;

#endif // PIPESTREAMFWD_H
// -*- C++ -*-
#ifndef FDMONITOR_H
#define FDMONITOR_H

#ifdef __GNUG__
#pragma interface
#endif

#include <boost/scoped_ptr.hpp>
#include <boost/function/function0.hpp>

#include <string>

class FDMonitor {
public:
	///
	FDMonitor();
	/// Define an empty d-tor out of line to keep boost::scoped_ptr happy.
	~FDMonitor();

	/** One FDMonitor instance can monitor only one file device at a
	 *  time.
	 */
	void start(std::string const & file, int fd) const;
	///
	void stop() const;
	///
	bool running() const;

	/** connect and you'll be informed when the file device is ready for
	 *  reading.
	 */
	typedef boost::function0<void> function_type;
	///
        function_type & input_received() const;

	/** The implementation is gui-specific, so use the Pimpl idiom to give
	 *  the different frontends complete flexibility.
	 */
        class Impl;

private:
	///
        boost::scoped_ptr<Impl> const pimpl_;
};


#endif // FDMONITOR_H
#include "config.h"

#ifdef __GNUG__
#pragma implementation
#endif

#include "fdmonitor.h"
#include "FileInfo.h"

#include <boost/signals/trackable.hpp>

#include FORMS_H_LOCATION

using std::string;

struct FDMonitor::Impl : public boost::signals::trackable {
	///
	Impl() : fd_(-1), timestamp_(0) {}
	///
	void start(string const & file, int fd);
	///
	void stop();
	///
	void checkstatus();
	///
	boost::function0<void> fd_changed_;
	///
	string file_;
	///
	int fd_;
	/// We use this to ascertain whether the file has really changed.
	time_t timestamp_;
};


FDMonitor::FDMonitor()
	: pimpl_(new Impl)
{}


FDMonitor::~FDMonitor()
{
	pimpl_->stop();
}


void FDMonitor::start(string const & file, int fd) const
{
	pimpl_->start(file, fd);
}


void FDMonitor::stop() const
{
	pimpl_->stop();
}


bool FDMonitor::running() const
{
	return pimpl_->fd_ != -1;
}


FDMonitor::function_type & FDMonitor::input_received() const
{
	return pimpl_->fd_changed_;
}


namespace {

extern "C"
void C_read_callback(int, void * data)
{
	FDMonitor::Impl * ptr = static_cast<FDMonitor::Impl *>(data);
	ptr->checkstatus();
}

}

void FDMonitor::Impl::start(string const & file, int fd)
{
	if (fd == -1)
		return;

	stop();

	FileInfo finfo(file);
	if (!finfo.isOK())
		return;

	fd_ = fd;

	fl_add_io_callback(fd_, FL_READ, C_read_callback, this);
	// Set this last, to avoid spurious callbacks from checkstatus
	file_ = file;
}


void FDMonitor::Impl::stop()
{
	if (fd_ != -1)
		fl_remove_io_callback(fd_, FL_READ, C_read_callback);
	file_.erase();
	timestamp_ = 0;
	fd_ = -1;
}


void FDMonitor::Impl::checkstatus()
{
	if (file_.empty())
		return;

	bool changed = false;

	FileInfo finfo(file_);
	if (!finfo.isOK()) {
		changed = timestamp_;
		timestamp_ = 0;

	} else {
		time_t const new_timestamp = finfo.getModificationTime();
		changed = new_timestamp != timestamp_;
		if (changed)
			timestamp_ = new_timestamp;
	}
	if (changed)
		fd_changed_();
}

Reply via email to