On Sun, Apr 14, 2002 at 01:16:38PM +0400, Alexander V. Lukyanov wrote:
> > It could also simply be removed.  Both gzip and bzip2 permit concatenating
> > files.  (In other words, remove for_each and make zcat and bz2cat regular
> > filters.)
> 
> Very nice. I did not know that.

I've removed the for_each code from OutputJob, and that simplified a
good deal, though I'm still not quite satisfied with it.  I don't think
either FileCopy can be removed; they're needed fundamentally.  I've
hid the pipe in one of the ctors, adding a flag to FDStream to close the
fd.

FileCopyPeerOutputJob is a stub to quickly allow "cat" to use OutputJob.
It's a little silly; data goes from the source of the cat, into
FileCopyPeerOutputJob, which then sends it to an OutputJob--which
promptly sends it back into a FileCopyPeer.  It's extremely simple code,
but it's also sending the data through more buffers.  I wrote that so
"cat" would work; it'd be nice if there's a more elegant way to do this.
I don't know if this is a performance issue in practice.  (Perhaps some
way for a FileCopy to hook directly to the input FileCopy in the
OutputJob; I'd rather not do that unless it's actually needed.)

I'll attach my current OutputJob, so you can see where it's at now.

-- 
Glenn Maynard
/* lftp and utils
 *
 * Copyright (c) 2002 by Alexander V. Lukyanov ([EMAIL PROTECTED])
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

/* Usage notes: 
 * 
 * Set AllowPostpone to true if sending large amounts of data.  Check the
 * result of each Put and Format call to see if a write was postponed.
 * If disabled, writes will always succeed.
 *
 * This is useful for jobs with a lot of output, like "cat". This can be
 * set selectively, where convenient.  For example, a job which outputs a
 * line of formatted text, followed by the contents of a file, can send
 * the first line with AllowPostpone off, then the file with it on.
 * 
 * Call PreFilter() to add a filter to the beginning of the chain; these
 * filters are initialized only once for all data.  For example,
 * PreFilter("wc -l")
 *
 */

/* 
 * Implementation notes:
 * Background things we can't get around: 
 * We must buffer (via FileCopy) output to a filter, since it might block.
 *
 * We must buffer the output from the filter to an output FileCopyPeer (ie.
 * a URL), for the same reason.
 *
 * So, we're stuck with having two FileCopy's.  (One to send, one to filter.)
 *
 * In some cases, we only need one: if the output is an FD, the filter can
 * hook up directly and we can forget about that stage.
 *
 * In the case where we're outputting to a path (or URL), we set up a pipe
 * to it and pretend we're just outputting to a file; this simplifies things
 * significantly.  This means in the simple case of having no filters at
 * all, writing to a URL or file, we send the data an extra time through
 * a FileCopy and a pipe.  That's a bit inefficient, but that's
 * "cat file1 > file2"; that's normally done with "get file1 -o file2", so
 * this shouldn't happen often.
 *
 * It's very important that if the output is stdout, any filters point directly
 * at it, not through an extra copy: a pager, for example, will expect the output
 * to be a TTY.
 *
 */
#include <config.h>

#include "OutputJob.h"
#include "ArgV.h"
#include "FileCopy.h"
#include "CopyJob.h"
#include "url.h"
#include "misc.h"
#include "StatusLine.h"
#include "LocalAccess.h"

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <fcntl.h>

#define super Job

void OutputJob::InitCopy()
{
   if(error)
      return;
   
   if(initialized)
      return;

   initialized=true;
      
   if(filter)
   {
      /* Create the global filter: */
      OutputFilter *global = new OutputFilter(filter, output_fd);
      global->DeleteSecondaryStream();
      output_fd=global;
   }

   /* Use a FileCopy to buffer our output to the filter: */
   FileCopyPeerFDStream *out = new FileCopyPeerFDStream(output_fd, FileCopyPeer::PUT);
   out->DontDeleteStream();
   
   FileCopy *input_fc = FileCopy::New(new FileCopyPeer(FileCopyPeer::GET), out, false);

   if(!fail_if_broken)
      input_fc->DontFailIfBroken();

   char *buf = xasprintf(_("%s (filter)"), a0);
   input=new CopyJob(input_fc, buf, filter?filter:a0);
   xfree(buf);

   if(!output)
      output=input;

   input->SetParentFg(this);
   InputPeer()->SetDate(NO_DATE);
   InputPeer()->SetSize(NO_SIZE);
   input->GetCopy()->DontCopyDate();
   input->NoStatus();

   if(input != output)
   {
      output->SetParentFg(this);
      OutputPeer()->SetDate(NO_DATE);
      OutputPeer()->SetSize(NO_SIZE);
      output->GetCopy()->DontCopyDate();
      output->NoStatus();
   }

   if(is_stdout)
   {
      output->ClearStatusOnWrite();
      output->GetCopy()->LineBuffered();
   }

   Timeout(0);
}

void OutputJob::Init(const char *_a0)
{
   input=output=0;
   filter=0;
   initialized=false;
   error=false;
   no_status=false;
   a0=xstrdup(_a0);
   last.Set(0,0);
   is_stdout=false;
   fail_if_broken=true;
   output_fd=0;
}

/* Local (fd) output. */
OutputJob::OutputJob(FDStream *output_, const char *a0):
   inter(1)
{
   Init(a0);

   output_fd=output_;

   if(!output_fd)
      output_fd=new FDStream(1,"<stdout>");
   else
      // some legitimate uses produce broken pipe condition (cat|head)
      // TODO: once actual piping uses OutputJob, set this only when
      // really doing a pipe, so cat>file can produce broken pipe
      fail_if_broken=false;

   is_stdout=output_fd->usesfd(1);

   /* We don't output status when outputting locally. */
   no_status=true;

   /* Make sure that if the output is going to fail, it fails early, so
    * the parent doesn't start anything expensive (like begin downloading
    * a file.) */
   if(output_fd->getfd() == -1)
   {
      eprintf("%s: %s\n", a0, output_fd->error_text);
      error=true;
   }
}

OutputJob::OutputJob(const char *path, const char *a0, FileAccess *fa):
   inter(1)
{
   Init(a0);

   /* Set up a pipe sending data at the peer, so we can act like the FDStream
    * constructor. */
   int filter_pipe[2];

   if(pipe(filter_pipe) == -1) {
      /* FIXME: This can be retryable. */
      eprintf("%s: %s\n", a0, strerror(errno));
      error=true;
      /* This won't actually be written to, since error is set; we must set
       * it to something, though. */
      output_fd=new FDStream(1, "<stdout>");
      return;
   }

   bool reuse = false;
   if(!fa)
   {
      fa = new LocalAccess;
      reuse = true;
   }

   FileCopyPeerFA *dst_peer = FileCopyPeerFA::New(fa, path, FA::STORE, reuse);

   /* Status only for remote outputs. */
   if(!strcmp(dst_peer->GetProto(), "file"))
      no_status=true;

   fcntl(filter_pipe[0],F_SETFL,O_NONBLOCK);
   fcntl(filter_pipe[1],F_SETFL,O_NONBLOCK);

   /* The output of the pipe (0) goes to the output FileCopy. */ 
   FDStream *pipe_output = new FDStream(filter_pipe[0],"<filter-out>");

   FileCopy *output_fc=FileCopy::New(new FileCopyPeerFDStream(pipe_output, FileCopyPeer::GET), dst_peer,false);
   output=new CopyJob(output_fc, path, a0);

   output_fd=new FDStream(filter_pipe[1],"<filter-in>");
   
   pipe_output->CloseFD();
   output_fd->CloseFD();
}

OutputJob::~OutputJob()
{
   Bg();
   AcceptSig(SIGTERM);

   Delete(input);
   if(input != output)
      Delete(output);
   delete output_fd;

   xfree(a0);
   xfree(filter);
}

void OutputJob::Reconfig(const char *r)
{
   if(!r || !strcmp(r,"cmd:status-interval"))
   {
      inter=TimeInterval((const char*)ResMgr::Query("cmd:status-interval",0));
   }
}

bool OutputJob::ShowStatusLine()
{
   /* If our output file is gone, or isn't stdout, we don't care, */
   if(!output || !is_stdout)
      return true;

   /* If we're filtered, we never display at all.  (We don't know anything about
    * the filter's output; the only time we do is when we're outputting to a URL
    * or a file, and that doesn't apply here.) */
   if(IsFiltered())
      return false;

   /* If we're not line buffered, display only if the output CopyJob says to. */
   if(!output->GetCopy()->IsLineBuffered())
      return output->HasStatus();

   /* We're line buffered, so we can output a status line without stomping
    * on a partially output line.
    * 
    * Don't display the statusline if the we've output something within the
    * last status interval, so if we're currently bursting output we won't
    * flicker status for no reason.  (Actually, we should be concerned about
    * the last time the output peer has sent something...) */
   if(now - last < inter)
      return false;

   last = now;

   /* Stop the output again, so the FileCopy will clear the StatusLine
    * when there's more data. */
   output->GetCopy()->AllowWrite(false);

   return true;
}

const char *OutputJob::Status(const StatusLine *s)
{
   if(no_status)
      return "";

   /* Never show anything if we havn't even received any data yet; it won't
    * start connecting until then, so it's not interesting. */
   if(!initialized)
      return "";

   /* Use the status from the output CopyJob.  It'll be the one that's connecting
    * to a host, if applicable. */
   return output->Status(s,true);
}

void OutputJob::PutEOF()
{
   /* Make sure we've sent at least one (empty) block.  This ensures
    * that we always start the input->output code path. */
   Put("", 0);

   if(InputPeer())
      InputPeer()->PutEOF();
}

/* add a filter to the beginning of the list */
void OutputJob::PreFilter(const char *newfilter)
{
   if(!filter)
   {
      SetFilter(newfilter);
      return;
   }

   char *newstr = xasprintf("%s | %s", newfilter, filter);
   SetFilter(newstr);
   xfree(newstr);
}

void OutputJob::SetFilter(const char *newfilter)
{
   xfree(filter);
   filter=xstrdup(newfilter);
}

int OutputJob::GetWidth() const
{
   if(IsFiltered() || output_fd->getfd() != 1)
      return -1;
   return fd_width(1);
}

bool OutputJob::IsTTY() const
{
   if(IsFiltered() || output_fd->getfd() != 1)
      return false;
   return isatty(1);
}

/* Get the input FileCopyPeer; this is the buffer we write to. */
FileCopyPeer *OutputJob::InputPeer() const
{
   return input? input->GetGet():0;
}

/* Get the output FileCopyPeer (the FileCopyPeer that's doing the final output). */
FileCopyPeer *OutputJob::OutputPeer() const
{
   return output? output->GetPut():0;
}

/* We're done if the output is finished, or on error. */
int OutputJob::Done()
{
   if(Error())
      return true;
   
   /* We're always done if the output breaks, regardless of whether
    * we treat it as an error or not. */
   if(output_fd->broken())
      return true;

   if(!initialized)
      return false;

   if(output && output->Done())
      return true;
   
   return false;
}

int OutputJob::Do()
{
   if(!fg_data && output_fd && output_fd->GetProcGroup())
   {
      fg_data=new FgData(output_fd->GetProcGroup(),fg);
      return MOVED;
   }
   
   return STALL;
}

/* Don't register errors until they're actually printed by
 * the sub-job (ie. it's also Done()). */
bool OutputJob::Error()
{
   if(error)
      return true;
   if(input && input->Error() && input->Done())
      error=true;
   if(output && input != output && output->Error() && output->Done())
      error=true;
   if(fail_if_broken && output_fd->broken())
      error=true;
   return error;
}

void OutputJob::Fg()
{
   super::Fg();
   if(input)
      input->Fg();
   if(output && input != output)
      output->Fg();
}

void OutputJob::Bg()
{
   if(output && input != output)
      output->Bg();
   if(input)
      input->Bg();
   super::Bg();
}

void OutputJob::Suspend()
{
   if(input)
      input->Suspend();
   if(output && input != output)
      output->Suspend();
   super::Suspend();
}

void OutputJob::Resume()
{
   if(input)
      input->Resume();
   if(output && input != output)
      output->Resume();
   super::Resume();
}

bool OutputJob::Full()
{
   if(input == 0)
      return false;

   /* It'd be nicer to just check copy->GetGet()->IsSuspended(), since
    * the FileCopy will suspend the Get end if the Put end gets filled.
    * However, it won't do that until it actually tries to send something. */
   int size = 0;
   if(input->GetPut())
      size += input->GetPut()->Buffered();
   if(input->GetGet())
      size += input->GetGet()->Buffered();
   if(input != output)
   {
      if(output->GetPut())
	 size += output->GetPut()->Buffered();
      if(output->GetGet())
	 size += output->GetGet()->Buffered();
   }

   return size >= 0x10000;
}

/* We'll actually go over the buffer limit here; that's OK; it's not a
 * strict value.  (It's not convenient to prevent that completely with
 * Format(), either.) */
void OutputJob::Put(const char *buf,int size)
{
   InitCopy();
   if(!InputPeer())
      return;

   last.SetToCurrentTime();

   int oldpos = InputPeer()->GetPos();
   InputPeer()->Put(buf, size);
   InputPeer()->SetPos(oldpos);
}

void OutputJob::Format(const char *f,...)
{
   InitCopy();
   if(!InputPeer())
      return;

   int oldpos = InputPeer()->GetPos();

   va_list v;
   va_start(v,f);
   InputPeer()->vFormat(f, v);
   va_end(v);

   InputPeer()->SetPos(oldpos);
}

/* Propagate signals down to our child processes. */
int OutputJob::AcceptSig(int sig)
{
   int m=MOVED;
   if(sig == SIGTERM || sig == SIGINT)
      m=WANTDIE;
   
   /* If we have an input copier right now, it'll contain the top filter
    * (which is linked to all other filters), so send it the signal. */
   if(input)
      m=input->AcceptSig(sig);
   /* Otherwise, the only filters we have running are in output_fd. */
   else
      output_fd->Kill(sig);
   if(sig!=SIGCONT)
      AcceptSig(SIGCONT);
   return m;
}

#ifndef OUTPUTJOB_H
#define OUTPUTJOB_H

#include "Job.h"
#include "FileCopy.h"
#include "CopyJob.h"
#include "TimeDate.h"

class StatusBar;

class OutputJob : public Job
{
   /* Main CopyJob: */
   CopyJob *input;

   /* CopyJob that sends to the output.  (output may be equal to input) */
   CopyJob *output;

   FDStream *output_fd;
   
   bool initialized;

   char *a0;
   char *filter;

   bool error;
   bool is_stdout;
   bool fail_if_broken;

   /* if true, we never contribute to the parent job's status
    * (Status() == "") */
   bool no_status;

   Time last;
   TimeInterval inter;

   void Init(const char *a0);
   void InitCopy();

   void SetError(const char *e, ...) PRINTF_LIKE(2,3);

   /* Get the input FileCopyPeer */
   FileCopyPeer *InputPeer() const;

   /* Get the output FileCopyPeer (the FileCopyPeer that's doing the final output) */
   FileCopyPeer *OutputPeer() const;
   
public:
   OutputJob(FDStream *output, const char *a0);
   OutputJob(const char *path, const char *a0, FA *fa=0);
   ~OutputJob();

   /* Set the main filter: */
   void SetFilter(const char *filter);
   /* Prepend a filter before the main filter: */
   void PreFilter(const char *filter);

   void DontFailIfBroken(bool n=false) { fail_if_broken=n; }
   bool Error();

   int Done();
   int Do();

   void Put(const char *buf,int size);
   void Put(const char *buf) { return Put(buf,strlen(buf)); }
   void Format(const char *f,...) PRINTF_LIKE(2,3);
   void PutEOF();

   /* Return true if our buffers don't want any more input.  (They'll always
    * *accept* more input; this is optional.) */
   bool Full();

   /* Get properties of the output: */
   int GetWidth() const;
   bool IsTTY() const;
   /* Whether the ultimate destination is stdout: */
   bool IsStdout() const { return is_stdout; }
   /* Whether the output is filtered: */
   bool IsFiltered() const { return filter; }

   /* Call before showing a StatusLine on a job using this class.  If it
    * returns false, don't display it. */
   bool ShowStatusLine();

   const char *Status(const StatusLine *s);
   void Reconfig(const char *r);

   void Fg();
   void Bg();
   void Suspend();
   void Resume();
   int AcceptSig(int sig);
};

#endif

Reply via email to