Our team has used chukwa *CharFileTailingAdaptorUTF8* to collect the log4j
rotated log files for several months.It does help us to collect the logs
from everywhere to our hadoop center.
During the work , we met several problems . And i have raised them in this
mail list , but i still haven't got a good solution.
So we  read the source code , and did some changes

Our log files are generated by the log4j ,and the log4j appender is
org.apache.log4j.DailyRollingFileAppender.
If you use log4j to generate the rotated log ,may this mail will help you.

These two problems are the causes why we have to modify the source code.

1. The mismatching checkpoint size and file size.

     I raised this problem in May 14 ,"the check point offset is bigger
than the log file size". And Ariel Rabkin  and Eric have answered my
question , thanks for your replies.

     When chukwa starts, it will read the the check point file , let the
size be the filereadoffset. The size in the checkpoint indicates how many
bytes the adaptor has send .

     If the log source is stream or a file won't rotate , this size is
right ,it indeed is the filereadoffset.But the file is rorated , the
checkpoint size is often bigger than the file size ,and this will cause
chukwa resend all the log file.

     So we add a "log.info("chunk seqID:"+c.getSeqID());" in
ChukwaHttpSender:send.

     *for (Chunk c : toSend) {
      DataOutputBuffer b = new
DataOutputBuffer(c.getSerializedSizeEstimate());
      try {
        c.write(b);
      } catch (IOException err) {
        log.error("serialization threw IOException", err);
      }
      serializedEvents.add(b);
      // store a CLE for this chunk which we will use to ack this chunk to
the
      // caller of send()
      // (e.g. the agent will use the list of CLE's for checkpointing)
      log.info("chunk seqID:"+c.getSeqID());
      commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(),
         c.getSeqID() - c.getData().length));
    }*
     *
    **The seqid is the offset of the send chunks in this log file.**
   * So when we need to restart the chukwa, we just need to stop the chukwa
, change the size in checkpoint to the last chunk seqid in log and start
chukwa.
      We also can directly apply the seqID to checkpoint size ,but we don't
know if this will cause other problems.
*

*2.* *The method tailFile in FileTailingAdaptor is the core code of
collecting the log. The code use the fileReadOffset , file length to detect
the rotated file.
        *RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");*
*        len = reader.length();*
*        long newLength = newReader.length();*
*        if (newLength < len && fileReadOffset >= len) {*
*          if (reader != null) {*
*            reader.close();*
*          }*
*          *
*          reader = newReader;*
*          fileReadOffset = 0L;*
*          log.debug("Adaptor|"+ adaptorID + "| File size mismatched,
rotating: "*
*              + toWatch.getAbsolutePath());*
*        } else {*
*          try {*
*            if (newReader != null) {*
*              newReader.close();*
*            }*
*            newReader =null;*
*          } catch (Throwable e) {*
*            // do nothing.*
*          }*
*        }*
*
*
*     *This arithmetic does work in most cases. But there is a case ,that
when chukwa starts , the log file is 0 and it will be 0 untill it has been
rotated. After it has been rotated ,becase its size is 0 ,this log will be
removed. A new file has generated , and its size isn't 0.
      But the len is still 0 ,newLength is > 0.So this contition  if
(newLength < len && fileReadOffset >= len)  will never be archived. The new
log file will never be detected.

        So we changed the implemention of this method, we use timestamp to
detect the new log file.The lastSlurpTime is the timestamp of the last
slurp ,it is been declared and assigned in LWFTAdaptor .
       try {
                len = reader.length();
                if(lastSlurpTime == 0){
                    lastSlurpTime = System.currentTimeMillis();
                }
                if (offsetOfFirstByte > fileReadOffset) {
                    // If the file rotated, the recorded offsetOfFirstByte
is greater than
                    // file size,reset the first byte position to beginning
of the file.
                    fileReadOffset = 0;
                    offsetOfFirstByte = 0L;
                    log.warn("offsetOfFirstByte>fileReadOffset, resetting
offset to 0");
                }
                if (len == fileReadOffset) {
                    File fixedNameFile = new
File(toWatch.getAbsolutePath());
                    long fixedNameLastModified =
fixedNameFile.lastModified();
                    if (fixedNameLastModified > lastSlurpTime) {
                        // If len == fileReadOffset,the file stops rolling
log or the file has rotated.
                        // But fixedNameLastModified > lastSlurpTime , this
means after the last slurping,the file has been written .
                        // so the file has been rotated.
                        boolean hasLeftData = true;
                        while(hasLeftData){// read the possiblly generated
log
                            hasLeftData = slurp(len, reader);
                        }
                        RandomAccessFile newReader = new
RandomAccessFile(toWatch, "r");
                        if (reader != null) {
                            reader.close();
                        }
                        reader = newReader;
                        fileReadOffset = 0L;
                        len = reader.length();
                        log.debug("Adaptor|" + adaptorID + "| File size
mismatched, rotating: " +
toWatch.getAbsolutePath());
                    }
                    hasMoreData = slurp(len, reader);
                } else if (len < fileReadOffset) {
                    // file has rotated and no detection
                    if (reader != null) {
                        reader.close();
                    }
                    reader = null;
                    fileReadOffset = 0L;
                    offsetOfFirstByte = 0L;
                    hasMoreData = true;
                    log.warn("Adaptor|" + adaptorID + "| file: " +
toWatch.getPath()
                            + ", has rotated and no detection - reset
counters to 0L");
                } else {
                    hasMoreData = slurp(len, reader);
                }


We hope these two changes will help the adaptor collect the rotated file
more well.

If these is anything wrong ,please let me know,

Thanks!



-- 
Best regards,

Ivy Tang
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
 * A base class for file tailing adaptors.  
 * Intended to mandate as little policy as possible, and to use as 
 * few system resources as possible.
 * 
 * 
 * If the file does not exist, this class will continue to retry quietly
 * forever and will start tailing if it's eventually created.
 */
public class LWFTAdaptor extends AbstractAdaptor {
  
  /**
   * This is the maximum amount we'll read from any one file before moving on to
   * the next. This way, we get quick response time for other files if one file
   * is growing rapidly.
   * 
   */
  public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
  public static final String MAX_READ_SIZE_OPT = 
      "chukwaAgent.fileTailingAdaptor.maxReadSize";

  public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
  
  static Logger log;
  protected static FileTailer tailer;
  
  static {
    tailer = null;
    log = Logger.getLogger(FileTailingAdaptor.class);
  }
  
  
  /**
   * next PHYSICAL offset to read
   */
  protected long fileReadOffset;

  /**
   * The logical offset of the first byte of the file
   */
  protected long offsetOfFirstByte = 0;
  protected Configuration conf = null;
  
  protected long lastSlurpTime = 0l;
  
  File toWatch;

  @Override
  public void start(long offset) {
    synchronized(LWFTAdaptor.class) {
      if(tailer == null)
        tailer = new FileTailer(control.getConfiguration());
    }
    this.fileReadOffset = offset - offsetOfFirstByte;    
    tailer.startWatchingFile(this);
  }
  
  /**
   * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
   */
  public String getCurrentStatus() {
    return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath();
  }

  public String toString() {
    return "Lightweight Tailer on " + toWatch;
  }

  public String getStreamName() {
    return toWatch.getPath();
  }
  
  @Override
  public String parseArgs(String params) { 
    conf = control.getConfiguration();
    MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);

    Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
    Matcher m = cmd.matcher(params);
    if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path.
      offsetOfFirstByte = Long.parseLong(m.group(1));
      toWatch = new File(m.group(2));
    } else {
      toWatch = new File(params.trim());
    }
    return toWatch.getAbsolutePath();
  }

  @Override
  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
      throws AdaptorException {
    tailer.stopWatchingFile(this);
    return fileReadOffset + offsetOfFirstByte;
  }
  

  /**
   * Extract records from a byte sequence
   * 
   * @param eq the queue to stick the new chunk[s] in
   * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
   * @param buf the byte buffer to extract records from
   * @return the number of bytes processed
   * @throws InterruptedException
   */
  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
      byte[] buf) throws InterruptedException {
    if(buf.length == 0)
      return 0;
    
    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
        buffOffsetInFile + buf.length, buf, this);

    eq.add(chunk);
    return buf.length;
  }
  
  protected boolean slurp(long len, RandomAccessFile reader) throws IOException,
  InterruptedException{
    boolean hasMoreData = false;

    log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
    reader.seek(fileReadOffset);

    long bufSize = len - fileReadOffset;

   if (bufSize > MAX_READ_SIZE) {
      bufSize = MAX_READ_SIZE;
      hasMoreData = true;
    }
    byte[] buf = new byte[(int) bufSize];

    long curOffset = fileReadOffset;

    lastSlurpTime = System.currentTimeMillis();
    int bufferRead = reader.read(buf);
    assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
        + " pointer is "
        + reader.getFilePointer()
        + " but offset is "
        + fileReadOffset + bufSize;

    int bytesUsed = extractRecords(dest,
        fileReadOffset + offsetOfFirstByte, buf);

    // === WARNING ===
    // If we couldn't found a complete record AND
    // we cannot read more, i.e bufferRead == MAX_READ_SIZE
    // it's because the record is too BIG
    // So log.warn, and drop current buffer so we can keep moving
    // instead of being stopped at that point for ever
    if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
      log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, dropping current buffer: startOffset="
              + curOffset
              + ", MAX_READ_SIZE="
              + MAX_READ_SIZE
              + ", for "
              + toWatch.getPath());
      bytesUsed = buf.length;
    }

    fileReadOffset = fileReadOffset + bytesUsed;

    log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
        + fileReadOffset);
    return hasMoreData;
  }
  
  public synchronized boolean tailFile()
  throws InterruptedException {
    boolean hasMoreData = false;
    try {
      
       //if file doesn't exist, length =0 and we just keep waiting for it.
      //if(!toWatch.exists())
      //  deregisterAndStop(false);
      
      long len = toWatch.length();
      if(len < fileReadOffset) {
        //file shrank; probably some data went missing.
        handleShrunkenFile(len);
      } else if(len > fileReadOffset) {
        RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
        slurp(len, reader);
        reader.close();
      }
    } catch(IOException e) {
      log.warn("IOException in tailer", e);
      deregisterAndStop();
    }
    
    return hasMoreData;
  }

  private void handleShrunkenFile(long measuredLen) {
    log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
    offsetOfFirstByte = measuredLen;
    fileReadOffset = 0;
  }

}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.adaptor.*;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

/**
 * An adaptor that repeatedly tails a specified file, sending the new bytes.
 * This class does not split out records, but just sends everything up to end of
 * file. Subclasses can alter this behavior by overriding extractRecords().
 * 
 */
public class FileTailingAdaptor extends LWFTAdaptor {

    public static int MAX_RETRIES = 300;
    public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes

    private int attempts = 0;
    private long gracefulPeriodExpired = 0l;
    private boolean adaptorInError = false;

    protected RandomAccessFile reader = null;

    

    public void start(long bytes) {
        super.start(bytes);
        log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
        this.attempts = 0;

        log.info("started file tailer " + adaptorID + "  on file " + toWatch + " with first byte at offset " + offsetOfFirstByte);
    }

    @Override
    public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {

        log.info("Enter Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);

        switch (shutdownPolicy) {
        case GRACEFULLY:
        case WAIT_TILL_FINISHED: {
            if (toWatch.exists()) {
                int retry = 0;
                tailer.stopWatchingFile(this);
                TerminatorThread lastTail = new TerminatorThread(this);
                lastTail.setDaemon(true);
                lastTail.start();

                if (shutdownPolicy.ordinal() == AdaptorShutdownPolicy.GRACEFULLY.ordinal()) {
                    while (lastTail.isAlive() && retry < 60) {
                        try {
                            log.info("GRACEFULLY Retry:" + retry);
                            Thread.sleep(1000);
                            retry++;
                        } catch (InterruptedException ex) {
                        }
                    }
                } else {
                    while (lastTail.isAlive()) {
                        try {
                            if (retry % 100 == 0) {
                                log.info("WAIT_TILL_FINISHED Retry:" + retry);
                            }
                            Thread.sleep(1000);
                            retry++;
                        } catch (InterruptedException ex) {
                        }
                    }
                }
            }
        }
            break;

        case HARD_STOP:
        default:
            tailer.stopWatchingFile(this);
            try {
                if (reader != null) {
                    reader.close();
                }
                reader = null;
            } catch (Throwable e) {
                log.warn("Exception while closing reader:", e);
            }
            break;
        }
        log.info("Exit Shutdown:" + shutdownPolicy.name() + " - ObjectId:" + this);
        return fileReadOffset + offsetOfFirstByte;
    }

    /**
     * Looks at the tail of the associated file, adds some of it to event queue
     * This method is not thread safe. Returns true if there's more data in the
     * file
     * 
     * @param eq
     *            the queue to write Chunks into
     */
    @Override
    public synchronized boolean tailFile() throws InterruptedException {
        boolean hasMoreData = false;

        try {
            if ((adaptorInError == true) && (System.currentTimeMillis() > gracefulPeriodExpired)) {
                if (!toWatch.exists()) {
                    log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts + "| File does not exist: " + toWatch.getAbsolutePath()
                            + ", streaming policy expired.  File removed from streaming.");
                } else if (!toWatch.canRead()) {
                    log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts + "| File cannot be read: " + toWatch.getAbsolutePath()
                            + ", streaming policy expired.  File removed from streaming.");
                } else {
                    // Should have never been there
                    adaptorInError = false;
                    gracefulPeriodExpired = 0L;
                    attempts = 0;
                    return false;
                }

                deregisterAndStop();
                return false;
            } else if (!toWatch.exists() || !toWatch.canRead()) {
                if (adaptorInError == false) {
                    long now = System.currentTimeMillis();
                    gracefulPeriodExpired = now + GRACEFUL_PERIOD;
                    adaptorInError = true;
                    attempts = 0;
                    log.warn("failed to stream data for: " + toWatch.getAbsolutePath() + ", graceful period will Expire at now:" + now
                            + " + " + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
                } else if (attempts % 10 == 0) {
                    log.info("failed to stream data for: " + toWatch.getAbsolutePath() + ", attempt: " + attempts);
                }

                attempts++;
                return false; // no more data
            }

            if (reader == null) {
                reader = new RandomAccessFile(toWatch, "r");
                log.info("Adaptor|" + adaptorID + "|Opening the file for the first time|seek|" + fileReadOffset);
            }
            

            long len = 0L;
            try {
                len = reader.length();
                if(lastSlurpTime == 0){
                    lastSlurpTime = System.currentTimeMillis();
                }
                if (offsetOfFirstByte > fileReadOffset) {
                    // If the file rotated, the recorded offsetOfFirstByte is greater than
                    // file size,reset the first byte position to beginning of the file.
                    fileReadOffset = 0;
                    offsetOfFirstByte = 0L;
                    log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
                }                     
                if (len == fileReadOffset) {
                    File fixedNameFile = new File(toWatch.getAbsolutePath());
                    long fixedNameLastModified = fixedNameFile.lastModified();
                    if (fixedNameLastModified > lastSlurpTime) {
                        // If len == fileReadOffset,the file stops rolling log or the file has rotated.
                        // But fixedNameLastModified > lastSlurpTime , this means after the last slurping,the file has been written ,
                        // so the file has been rotated.
                        boolean hasLeftData = true;
                        while(hasLeftData){// read the possiblly generated log
                            hasLeftData = slurp(len, reader);
                        }
                        RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
                        if (reader != null) {
                            reader.close();
                        }
                        reader = newReader;
                        fileReadOffset = 0L;
                        len = reader.length();
                        log.debug("Adaptor|" + adaptorID + "| File size mismatched, rotating: " + toWatch.getAbsolutePath());                        
                        
                    } 
                    hasMoreData = slurp(len, reader);
                } else if (len < fileReadOffset) {
                    // file has rotated and no detection
                    if (reader != null) {
                        reader.close();
                    }
                    reader = null;
                    fileReadOffset = 0L;
                    offsetOfFirstByte = 0L;
                    hasMoreData = true;
                    log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
                            + ", has rotated and no detection - reset counters to 0L");
                } else {                               
                    hasMoreData = slurp(len, reader);
                }

            } catch (IOException e) {
                // do nothing, if file doesn't exist.
            }
        } catch (IOException e) {
            log.warn("failure reading " + toWatch, e);
        }
        attempts = 0;
        adaptorInError = false;
        return hasMoreData;
    }

}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.chukwa.datacollection.sender;


import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpMethodBase;
import org.apache.commons.httpclient.HttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.methods.*;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.log4j.Logger;

/**
 * Encapsulates all of the http setup and connection details needed for chunks
 * to be delivered to a collector.
 * 
 * This class should encapsulate the details of the low level data formatting.
 * The Connector is responsible for picking what to send and to whom;
 * retry policy is encoded in the collectors iterator.
 * 
 * This class is not thread safe. Synchronization is the caller's responsibility.
 * 
 * <p>
 * On error, tries the list of available collectors, pauses for a minute, and
 * then repeats.
 * </p>
 * <p>
 * Will wait forever for collectors to come up.
 * </p>
 */
public class ChukwaHttpSender implements ChukwaSender {
  final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
  final int SENDER_RETRIES;
  final int WAIT_FOR_COLLECTOR_REBOOT;
  final int COLLECTOR_TIMEOUT;
  
  public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
  // FIXME: this should really correspond to the timer in RetryListOfCollectors

  static final HttpSenderMetrics metrics = new HttpSenderMetrics("ChukwaAgent", "chukwaHttpSender");
  
  static Logger log = Logger.getLogger(ChukwaHttpSender.class);
  static HttpClient client = null;
  static MultiThreadedHttpConnectionManager connectionManager = null;
  String currCollector = null;
  int postID = 0;

  protected Iterator<String> collectors;

  static {
    connectionManager = new MultiThreadedHttpConnectionManager();
    client = new HttpClient(connectionManager);
    connectionManager.closeIdleConnections(1000);
  }

  public static class CommitListEntry {
    public Adaptor adaptor;
    public long uuid;
    public long start; //how many bytes of stream
    public CommitListEntry(Adaptor a, long uuid, long start) {
      adaptor = a;
      this.uuid = uuid;
      this.start = start;
    }
  }

  // FIXME: probably we're better off with an EventListRequestEntity
  static class BuffersRequestEntity implements RequestEntity {
    List<DataOutputBuffer> buffers;

    public BuffersRequestEntity(List<DataOutputBuffer> buf) {
      buffers = buf;
    }

    public long getContentLength() {
      long len = 4;// first we send post length, then buffers
      for (DataOutputBuffer b : buffers)
        len += b.getLength();
      return len;
    }

    public String getContentType() {
      return "application/octet-stream";
    }

    public boolean isRepeatable() {
      return true;
    }

    public void writeRequest(OutputStream out) throws IOException {
      DataOutputStream dos = new DataOutputStream(out);
      dos.writeInt(buffers.size());
      for (DataOutputBuffer b : buffers)
        dos.write(b.getData(), 0, b.getLength());
    }
  }

  public ChukwaHttpSender(Configuration c) {
    // setup default collector
    ArrayList<String> tmp = new ArrayList<String>();
    this.collectors = tmp.iterator();

    MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
    SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
    WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
        20 * 1000);
    COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
  }

  /**
   * Set up a list of connectors for this client to send {@link Chunk}s to
   * 
   * @param collectors
   */
  public void setCollectors(Iterator<String> collectors) {
    this.collectors = collectors;
    // setup a new destination from our list of collectors if one hasn't been
    // set up
    if (currCollector == null) {
      if (collectors.hasNext()) {
        currCollector = collectors.next();
      } else
        log.error("No collectors to try in send(), won't even try to do doPost()");
    }
  }

  /**
   * grab all of the chunks currently in the chunkQueue, stores a copy of them
   * locally, calculates their size, sets them up
   * 
   * @return array of chunk id's which were ACKed by collector
   */
  @Override
  public List<CommitListEntry> send(List<Chunk> toSend)
      throws InterruptedException, IOException {
    List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
    List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();

    int thisPost = postID++;
    log.info("collected " + toSend.size() + " chunks for post_"+thisPost);

    // Serialize each chunk in turn into it's own DataOutputBuffer and add that
    // buffer to serializedEvents
    for (Chunk c : toSend) {
      DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
      try {
        c.write(b);
      } catch (IOException err) {
        log.error("serialization threw IOException", err);
      }
      serializedEvents.add(b);
      // store a CLE for this chunk which we will use to ack this chunk to the
      // caller of send()
      // (e.g. the agent will use the list of CLE's for checkpointing)
      log.info("chunk seqID:"+c.getSeqID());
      commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(), 
         c.getSeqID() - c.getData().length));
    }
    

    // collect all serialized chunks into a single buffer to send
    RequestEntity postData = new BuffersRequestEntity(serializedEvents);

    PostMethod method = new PostMethod();
    method.setRequestEntity(postData);
    log.info(">>>>>> HTTP post_"+thisPost + " to " + currCollector + " length = " + postData.getContentLength());

    List<CommitListEntry> results =  postAndParseResponse(method, commitResults);
    log.info("post_" + thisPost + " sent " + toSend.size() + " chunks, got back " + results.size() + " acks");
    toSend.clear();
    return results;
  }
  
  /**
   * 
   * @param method the data to push
   * @param expectedCommitResults the list
   * @return the list of committed chunks
   * @throws IOException
   * @throws InterruptedException
   */
  public List<CommitListEntry> postAndParseResponse(PostMethod method, 
        List<CommitListEntry> expectedCommitResults)
  throws IOException, InterruptedException{
    reliablySend(method, "chukwa"); //FIXME: shouldn't need to hardcode this here
    return expectedCommitResults;
  }

  /**
   *  Responsible for executing the supplied method on at least one collector
   * @param method
   * @return
   * @throws InterruptedException
   * @throws IOException if no collector responds with an OK
   */
  protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
    int retries = SENDER_RETRIES;
    while (currCollector != null) {
      // need to pick a destination here
      try {

        // send it across the network    
        List<String> responses = doRequest(method, currCollector+ pathSuffix);

        retries = SENDER_RETRIES; // reset count on success

        return responses;
      } catch (Throwable e) {
        log.error("Http post exception on "+ currCollector +": "+ e.toString());
        log.debug("Http post exception on "+ currCollector, e);
        ChukwaHttpSender.metrics.httpThrowable.inc();
        if (collectors.hasNext()) {
          ChukwaHttpSender.metrics.collectorRollover.inc();
          boolean repeatPost = failedCollector(currCollector);
          currCollector = collectors.next();
          if(repeatPost)
            log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
                + currCollector);
          else {
            log.info("Using " + currCollector + " in the future, but not retrying this post");
            break;
          }
        } else {
          if (retries > 0) {
            log.warn("No more collectors to try rolling over to; waiting "
                + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
                + "retries left)");
            Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
            retries--;
          } else {
            log.error("No more collectors to try rolling over to; aborting post");
            throw new IOException("no collectors");
          }
        }
      } finally {
        // be sure the connection is released back to the connection manager
        method.releaseConnection();
      }
    } // end retry loop
    return new ArrayList<String>();
  }

  /**
   * A hook for taking action when a collector is declared failed.
   * Returns whether to retry current post, or junk it
   * @param downCollector
   */
  protected boolean failedCollector(String downCollector) {
    log.debug("declaring "+ downCollector + " down");
    return true;
  }

  /**
   * Responsible for performing a single operation to a specified collector URL.
   * 
   * @param dest the URL being requested. (Including hostname)
   */
  protected List<String> doRequest(HttpMethodBase method, String dest)
      throws IOException, HttpException {

    HttpMethodParams pars = method.getParams();
    pars.setParameter(HttpMethodParams.RETRY_HANDLER,
        (Object) new HttpMethodRetryHandler() {
          public boolean retryMethod(HttpMethod m, IOException e, int exec) {
            return !(e instanceof java.net.ConnectException)
                && (exec < MAX_RETRIES_PER_COLLECTOR);
          }
        });

    pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));

    method.setParams(pars);
    method.setPath(dest);

    // Send POST request
    ChukwaHttpSender.metrics.httpPost.inc();
    
    int statusCode = client.executeMethod(method);

    if (statusCode != HttpStatus.SC_OK) {
      ChukwaHttpSender.metrics.httpException.inc();
      
      if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
        ChukwaHttpSender.metrics.httpTimeOutException.inc();
      }
      
      log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
      // do something aggressive here
      throw new HttpException("got back a failure from server");
    }
    // implicitly "else"
    log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
            + method.getResponseContentLength());

    // FIXME: should parse acks here
    InputStream rstream = null;

    // Get the response body
    byte[] resp_buf = method.getResponseBody();
    rstream = new ByteArrayInputStream(resp_buf);
    BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
    String line;
    List<String> resp = new ArrayList<String>();
    while ((line = br.readLine()) != null) {
      if (log.isDebugEnabled()) {
        log.debug("response: " + line);
      }
      resp.add(line);
    }
    return resp;
  }

  @Override
  public void stop() {
  }
}

Reply via email to