On Sun, Apr 14, 2002 at 01:16:38PM +0400, Alexander V. Lukyanov wrote: > > It could also simply be removed. Both gzip and bzip2 permit concatenating > > files. (In other words, remove for_each and make zcat and bz2cat regular > > filters.) > > Very nice. I did not know that.
I've removed the for_each code from OutputJob, and that simplified a good deal, though I'm still not quite satisfied with it. I don't think either FileCopy can be removed; they're needed fundamentally. I've hid the pipe in one of the ctors, adding a flag to FDStream to close the fd. FileCopyPeerOutputJob is a stub to quickly allow "cat" to use OutputJob. It's a little silly; data goes from the source of the cat, into FileCopyPeerOutputJob, which then sends it to an OutputJob--which promptly sends it back into a FileCopyPeer. It's extremely simple code, but it's also sending the data through more buffers. I wrote that so "cat" would work; it'd be nice if there's a more elegant way to do this. I don't know if this is a performance issue in practice. (Perhaps some way for a FileCopy to hook directly to the input FileCopy in the OutputJob; I'd rather not do that unless it's actually needed.) I'll attach my current OutputJob, so you can see where it's at now. -- Glenn Maynard
/* lftp and utils * * Copyright (c) 2002 by Alexander V. Lukyanov ([EMAIL PROTECTED]) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ /* Usage notes: * * Set AllowPostpone to true if sending large amounts of data. Check the * result of each Put and Format call to see if a write was postponed. * If disabled, writes will always succeed. * * This is useful for jobs with a lot of output, like "cat". This can be * set selectively, where convenient. For example, a job which outputs a * line of formatted text, followed by the contents of a file, can send * the first line with AllowPostpone off, then the file with it on. * * Call PreFilter() to add a filter to the beginning of the chain; these * filters are initialized only once for all data. For example, * PreFilter("wc -l") * */ /* * Implementation notes: * Background things we can't get around: * We must buffer (via FileCopy) output to a filter, since it might block. * * We must buffer the output from the filter to an output FileCopyPeer (ie. * a URL), for the same reason. * * So, we're stuck with having two FileCopy's. (One to send, one to filter.) * * In some cases, we only need one: if the output is an FD, the filter can * hook up directly and we can forget about that stage. * * In the case where we're outputting to a path (or URL), we set up a pipe * to it and pretend we're just outputting to a file; this simplifies things * significantly. This means in the simple case of having no filters at * all, writing to a URL or file, we send the data an extra time through * a FileCopy and a pipe. That's a bit inefficient, but that's * "cat file1 > file2"; that's normally done with "get file1 -o file2", so * this shouldn't happen often. * * It's very important that if the output is stdout, any filters point directly * at it, not through an extra copy: a pager, for example, will expect the output * to be a TTY. * */ #include <config.h> #include "OutputJob.h" #include "ArgV.h" #include "FileCopy.h" #include "CopyJob.h" #include "url.h" #include "misc.h" #include "StatusLine.h" #include "LocalAccess.h" #include <assert.h> #include <unistd.h> #include <errno.h> #include <sys/ioctl.h> #include <fcntl.h> #define super Job void OutputJob::InitCopy() { if(error) return; if(initialized) return; initialized=true; if(filter) { /* Create the global filter: */ OutputFilter *global = new OutputFilter(filter, output_fd); global->DeleteSecondaryStream(); output_fd=global; } /* Use a FileCopy to buffer our output to the filter: */ FileCopyPeerFDStream *out = new FileCopyPeerFDStream(output_fd, FileCopyPeer::PUT); out->DontDeleteStream(); FileCopy *input_fc = FileCopy::New(new FileCopyPeer(FileCopyPeer::GET), out, false); if(!fail_if_broken) input_fc->DontFailIfBroken(); char *buf = xasprintf(_("%s (filter)"), a0); input=new CopyJob(input_fc, buf, filter?filter:a0); xfree(buf); if(!output) output=input; input->SetParentFg(this); InputPeer()->SetDate(NO_DATE); InputPeer()->SetSize(NO_SIZE); input->GetCopy()->DontCopyDate(); input->NoStatus(); if(input != output) { output->SetParentFg(this); OutputPeer()->SetDate(NO_DATE); OutputPeer()->SetSize(NO_SIZE); output->GetCopy()->DontCopyDate(); output->NoStatus(); } if(is_stdout) { output->ClearStatusOnWrite(); output->GetCopy()->LineBuffered(); } Timeout(0); } void OutputJob::Init(const char *_a0) { input=output=0; filter=0; initialized=false; error=false; no_status=false; a0=xstrdup(_a0); last.Set(0,0); is_stdout=false; fail_if_broken=true; output_fd=0; } /* Local (fd) output. */ OutputJob::OutputJob(FDStream *output_, const char *a0): inter(1) { Init(a0); output_fd=output_; if(!output_fd) output_fd=new FDStream(1,"<stdout>"); else // some legitimate uses produce broken pipe condition (cat|head) // TODO: once actual piping uses OutputJob, set this only when // really doing a pipe, so cat>file can produce broken pipe fail_if_broken=false; is_stdout=output_fd->usesfd(1); /* We don't output status when outputting locally. */ no_status=true; /* Make sure that if the output is going to fail, it fails early, so * the parent doesn't start anything expensive (like begin downloading * a file.) */ if(output_fd->getfd() == -1) { eprintf("%s: %s\n", a0, output_fd->error_text); error=true; } } OutputJob::OutputJob(const char *path, const char *a0, FileAccess *fa): inter(1) { Init(a0); /* Set up a pipe sending data at the peer, so we can act like the FDStream * constructor. */ int filter_pipe[2]; if(pipe(filter_pipe) == -1) { /* FIXME: This can be retryable. */ eprintf("%s: %s\n", a0, strerror(errno)); error=true; /* This won't actually be written to, since error is set; we must set * it to something, though. */ output_fd=new FDStream(1, "<stdout>"); return; } bool reuse = false; if(!fa) { fa = new LocalAccess; reuse = true; } FileCopyPeerFA *dst_peer = FileCopyPeerFA::New(fa, path, FA::STORE, reuse); /* Status only for remote outputs. */ if(!strcmp(dst_peer->GetProto(), "file")) no_status=true; fcntl(filter_pipe[0],F_SETFL,O_NONBLOCK); fcntl(filter_pipe[1],F_SETFL,O_NONBLOCK); /* The output of the pipe (0) goes to the output FileCopy. */ FDStream *pipe_output = new FDStream(filter_pipe[0],"<filter-out>"); FileCopy *output_fc=FileCopy::New(new FileCopyPeerFDStream(pipe_output, FileCopyPeer::GET), dst_peer,false); output=new CopyJob(output_fc, path, a0); output_fd=new FDStream(filter_pipe[1],"<filter-in>"); pipe_output->CloseFD(); output_fd->CloseFD(); } OutputJob::~OutputJob() { Bg(); AcceptSig(SIGTERM); Delete(input); if(input != output) Delete(output); delete output_fd; xfree(a0); xfree(filter); } void OutputJob::Reconfig(const char *r) { if(!r || !strcmp(r,"cmd:status-interval")) { inter=TimeInterval((const char*)ResMgr::Query("cmd:status-interval",0)); } } bool OutputJob::ShowStatusLine() { /* If our output file is gone, or isn't stdout, we don't care, */ if(!output || !is_stdout) return true; /* If we're filtered, we never display at all. (We don't know anything about * the filter's output; the only time we do is when we're outputting to a URL * or a file, and that doesn't apply here.) */ if(IsFiltered()) return false; /* If we're not line buffered, display only if the output CopyJob says to. */ if(!output->GetCopy()->IsLineBuffered()) return output->HasStatus(); /* We're line buffered, so we can output a status line without stomping * on a partially output line. * * Don't display the statusline if the we've output something within the * last status interval, so if we're currently bursting output we won't * flicker status for no reason. (Actually, we should be concerned about * the last time the output peer has sent something...) */ if(now - last < inter) return false; last = now; /* Stop the output again, so the FileCopy will clear the StatusLine * when there's more data. */ output->GetCopy()->AllowWrite(false); return true; } const char *OutputJob::Status(const StatusLine *s) { if(no_status) return ""; /* Never show anything if we havn't even received any data yet; it won't * start connecting until then, so it's not interesting. */ if(!initialized) return ""; /* Use the status from the output CopyJob. It'll be the one that's connecting * to a host, if applicable. */ return output->Status(s,true); } void OutputJob::PutEOF() { /* Make sure we've sent at least one (empty) block. This ensures * that we always start the input->output code path. */ Put("", 0); if(InputPeer()) InputPeer()->PutEOF(); } /* add a filter to the beginning of the list */ void OutputJob::PreFilter(const char *newfilter) { if(!filter) { SetFilter(newfilter); return; } char *newstr = xasprintf("%s | %s", newfilter, filter); SetFilter(newstr); xfree(newstr); } void OutputJob::SetFilter(const char *newfilter) { xfree(filter); filter=xstrdup(newfilter); } int OutputJob::GetWidth() const { if(IsFiltered() || output_fd->getfd() != 1) return -1; return fd_width(1); } bool OutputJob::IsTTY() const { if(IsFiltered() || output_fd->getfd() != 1) return false; return isatty(1); } /* Get the input FileCopyPeer; this is the buffer we write to. */ FileCopyPeer *OutputJob::InputPeer() const { return input? input->GetGet():0; } /* Get the output FileCopyPeer (the FileCopyPeer that's doing the final output). */ FileCopyPeer *OutputJob::OutputPeer() const { return output? output->GetPut():0; } /* We're done if the output is finished, or on error. */ int OutputJob::Done() { if(Error()) return true; /* We're always done if the output breaks, regardless of whether * we treat it as an error or not. */ if(output_fd->broken()) return true; if(!initialized) return false; if(output && output->Done()) return true; return false; } int OutputJob::Do() { if(!fg_data && output_fd && output_fd->GetProcGroup()) { fg_data=new FgData(output_fd->GetProcGroup(),fg); return MOVED; } return STALL; } /* Don't register errors until they're actually printed by * the sub-job (ie. it's also Done()). */ bool OutputJob::Error() { if(error) return true; if(input && input->Error() && input->Done()) error=true; if(output && input != output && output->Error() && output->Done()) error=true; if(fail_if_broken && output_fd->broken()) error=true; return error; } void OutputJob::Fg() { super::Fg(); if(input) input->Fg(); if(output && input != output) output->Fg(); } void OutputJob::Bg() { if(output && input != output) output->Bg(); if(input) input->Bg(); super::Bg(); } void OutputJob::Suspend() { if(input) input->Suspend(); if(output && input != output) output->Suspend(); super::Suspend(); } void OutputJob::Resume() { if(input) input->Resume(); if(output && input != output) output->Resume(); super::Resume(); } bool OutputJob::Full() { if(input == 0) return false; /* It'd be nicer to just check copy->GetGet()->IsSuspended(), since * the FileCopy will suspend the Get end if the Put end gets filled. * However, it won't do that until it actually tries to send something. */ int size = 0; if(input->GetPut()) size += input->GetPut()->Buffered(); if(input->GetGet()) size += input->GetGet()->Buffered(); if(input != output) { if(output->GetPut()) size += output->GetPut()->Buffered(); if(output->GetGet()) size += output->GetGet()->Buffered(); } return size >= 0x10000; } /* We'll actually go over the buffer limit here; that's OK; it's not a * strict value. (It's not convenient to prevent that completely with * Format(), either.) */ void OutputJob::Put(const char *buf,int size) { InitCopy(); if(!InputPeer()) return; last.SetToCurrentTime(); int oldpos = InputPeer()->GetPos(); InputPeer()->Put(buf, size); InputPeer()->SetPos(oldpos); } void OutputJob::Format(const char *f,...) { InitCopy(); if(!InputPeer()) return; int oldpos = InputPeer()->GetPos(); va_list v; va_start(v,f); InputPeer()->vFormat(f, v); va_end(v); InputPeer()->SetPos(oldpos); } /* Propagate signals down to our child processes. */ int OutputJob::AcceptSig(int sig) { int m=MOVED; if(sig == SIGTERM || sig == SIGINT) m=WANTDIE; /* If we have an input copier right now, it'll contain the top filter * (which is linked to all other filters), so send it the signal. */ if(input) m=input->AcceptSig(sig); /* Otherwise, the only filters we have running are in output_fd. */ else output_fd->Kill(sig); if(sig!=SIGCONT) AcceptSig(SIGCONT); return m; }
#ifndef OUTPUTJOB_H #define OUTPUTJOB_H #include "Job.h" #include "FileCopy.h" #include "CopyJob.h" #include "TimeDate.h" class StatusBar; class OutputJob : public Job { /* Main CopyJob: */ CopyJob *input; /* CopyJob that sends to the output. (output may be equal to input) */ CopyJob *output; FDStream *output_fd; bool initialized; char *a0; char *filter; bool error; bool is_stdout; bool fail_if_broken; /* if true, we never contribute to the parent job's status * (Status() == "") */ bool no_status; Time last; TimeInterval inter; void Init(const char *a0); void InitCopy(); void SetError(const char *e, ...) PRINTF_LIKE(2,3); /* Get the input FileCopyPeer */ FileCopyPeer *InputPeer() const; /* Get the output FileCopyPeer (the FileCopyPeer that's doing the final output) */ FileCopyPeer *OutputPeer() const; public: OutputJob(FDStream *output, const char *a0); OutputJob(const char *path, const char *a0, FA *fa=0); ~OutputJob(); /* Set the main filter: */ void SetFilter(const char *filter); /* Prepend a filter before the main filter: */ void PreFilter(const char *filter); void DontFailIfBroken(bool n=false) { fail_if_broken=n; } bool Error(); int Done(); int Do(); void Put(const char *buf,int size); void Put(const char *buf) { return Put(buf,strlen(buf)); } void Format(const char *f,...) PRINTF_LIKE(2,3); void PutEOF(); /* Return true if our buffers don't want any more input. (They'll always * *accept* more input; this is optional.) */ bool Full(); /* Get properties of the output: */ int GetWidth() const; bool IsTTY() const; /* Whether the ultimate destination is stdout: */ bool IsStdout() const { return is_stdout; } /* Whether the output is filtered: */ bool IsFiltered() const { return filter; } /* Call before showing a StatusLine on a job using this class. If it * returns false, don't display it. */ bool ShowStatusLine(); const char *Status(const StatusLine *s); void Reconfig(const char *r); void Fg(); void Bg(); void Suspend(); void Resume(); int AcceptSig(int sig); }; #endif