    +package org.apache.spark.flume.sink
    +import org.apache.flume.sink.AbstractSink
    +import java.util.concurrent.locks.ReentrantLock
    +import org.apache.flume.Sink.Status
    +import org.apache.spark.flume.{SparkSinkEvent, EventBatch, 
    +import scala.util.control.Breaks
    +import java.nio.ByteBuffer
    +import org.apache.flume.{FlumeException, Context}
    +import org.slf4j.LoggerFactory
    +import java.util.concurrent.atomic.AtomicLong
    +import org.apache.commons.lang.RandomStringUtils
    +import java.util.concurrent._
    +import java.util
    +import org.apache.flume.conf.{ConfigurationException, Configurable}
    +import org.apache.avro.ipc.NettyServer
    +import org.apache.avro.ipc.specific.SpecificResponder
    +class SparkSink() extends AbstractSink with Configurable {
    +  private val LOG = LoggerFactory.getLogger(this.getClass)
    +  private val lock = new ReentrantLock()
    +  private val blockingCondition = lock.newCondition()
    +  // This sink will not persist sequence numbers and reuses them if it 
gets restarted.
    +  // So it is possible to commit a transaction which may have been meant 
for the sink before the
    +  // restart.
    +  // Since the new txn may not have the same sequence number we must guard 
against accidentally
    +  // committing
    +  // a new transaction. To reduce the probability of that happening a 
random string is prepended
    +  // to the sequence number.
    +  // Does not change for life of sink
    +  private val seqBase = RandomStringUtils.randomAlphanumeric(8)
    +  // Incremented for each transaction
    +  private val seqNum = new AtomicLong(0)
    +  private var transactionExecutorOpt: Option[ExecutorService] = None
    +  private var numProcessors: Integer = 
    +  private var transactionTimeout = 
    +  private val processorMap = new ConcurrentHashMap[CharSequence, 
    +  private var processorFactory: Option[SparkHandlerFactory] = None
    +  private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
    +  private var port: Int = 0
    +  private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS
    +  private var serverOpt: Option[NettyServer] = None
    +  private var running = false
    +  override def start() {
    +    transactionExecutorOpt = 
    +      new ThreadFactoryBuilder().setDaemon(true)
    +        .setNameFormat("Spark Sink, " + getName + " Processor Thread - 
    +    processorFactory = Option(new SparkHandlerFactory(numProcessors))
    +    val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new 
    +    // Using the constructor that takes specific thread-pools requires 
bringing in netty
    +    // dependencies which are being excluded in the build. In practice,
    +    // Netty dependencies are already available on the JVM as Flume would 
have pulled them in.
    +    serverOpt = Option(new NettyServer(responder, new 
InetSocketAddress(hostname, port)))
    + => server.start())
    +    lock.lock()
    +    try {
    +      running = true
    +    } finally {
    +      lock.unlock()
    +    }
    +    super.start()
    +  }
    +  override def stop() {
    + => executor.shutdownNow())
    + => {
    +      server.close()
    +      server.join()
    +    })
    +    lock.lock()
    +    try {
    +      running = false
    +      blockingCondition.signalAll()
    +    } finally {
    +      lock.unlock()
    +    }
    +  }
    +  override def configure(ctx: Context) {
    +    import SparkSinkConfig._
    +    hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
    +    val portOpt = Option(ctx.getInteger(CONF_PORT))
    +    if(portOpt.isDefined) {
    +      port = portOpt.get
    +    } else {
    +      throw new ConfigurationException("The Port to bind must be 
    +    }
    +    numProcessors = ctx.getInteger(PROCESSOR_COUNT, 
    +    transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, 
    +    maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS)
    +  }
    +  override def process(): Status = {
    +    // This method is called in a loop by the Flume framework - block it 
until the sink is
    +    // stopped to save CPU resources
    +    lock.lock()
    +    try {
    +      while(running) {
    +        blockingCondition.await()
    +      }
    +    } finally {
    +      lock.unlock()
    +    }
    +    Status.BACKOFF
    +  }
    +  // Object representing an empty batch returned by the txn processor due 
to some error.
    +  case object ErrorEventBatch extends EventBatch
    +  private class AvroCallbackHandler() extends SparkFlumeProtocol {
    +    override def getEventBatch(n: Int): EventBatch = {
    +      val processor = processorFactory.get.checkOut(n)
    + => executor.submit(processor))
    +      // Wait until a batch is available - can be null if some error was 
    +      val eventBatch = processor.eventQueue.take()
    +      eventBatch match {
    +        case ErrorEventBatch => throw new FlumeException("Something went 
wrong. No events" +
    +          " retrieved from channel.")
    +        case events => {
    +          processorMap.put(events.getSequenceNumber, processor)
    +          if (LOG.isDebugEnabled) {
    +            LOG.debug("Sent " + events.getEventBatch.size() +
    +              " events with sequence number: " + events.getSequenceNumber)
    +          }
    +          events
    +        }
    +      }
    +    }
    +    override def ack(sequenceNumber: CharSequence): Void = {
    +      completeTransaction(sequenceNumber, success = true)
    +      null
    +    }
    +    override def nack(sequenceNumber: CharSequence): Void = {
    +      completeTransaction(sequenceNumber, success = false)
    +"Spark failed to commit transaction. Will reattempt 
    +      null
    +    }
    +    def completeTransaction(sequenceNumber: CharSequence, success: 
Boolean) {
    +      val processorOpt = Option(processorMap.remove(sequenceNumber))
    +      if (processorOpt.isDefined) {
    +        val processor = processorOpt.get
    +        processor.resultQueueUpdateLock.lock()
    +        try {
    +          // Is the sequence number the same as the one the processor is 
processing? If not,
    +          // don't update {
    +          if 
(processor.eventBatch.getSequenceNumber.equals(sequenceNumber)) {
    +            processor.resultQueue.put(success)
    +          }
    +        } finally {
    +          processor.resultQueueUpdateLock.unlock()
    +        }
    +      }
    +    }
    +  }
    +  // Flume forces transactions to be thread-local (horrible, I know!)
    +  // So the sink basically spawns a new thread to pull the events out 
within a transaction.
    +  // The thread fills in the event batch object that is set before the 
thread is scheduled.
    +  // After filling it in, the thread waits on a condition - which is 
released only
    +  // when the success message comes back for the specific sequence number 
for that event batch.
    +  /**
    +   * This class represents a transaction on the Flume channel. This class 
runs a separate thread
    +   * which owns the transaction. It is blocked until the success call for 
that transaction comes
    +   * back.
    +   * @param maxBatchSize
    +   */
    +  private class TransactionProcessor(var maxBatchSize: Int) extends 
Callable[Void] {
    +    // Must be set to a new event batch before scheduling this!!
    +    val eventBatch = new EventBatch("", new 
    +    val eventQueue = new SynchronousQueue[EventBatch]()
    +    val resultQueue = new SynchronousQueue[Boolean]()
    +    val resultQueueUpdateLock = new ReentrantLock()
    +    object Zero {
    +      val zero = "0" // Oh, I miss static finals
    +    }
    +    override def call(): Void = {
    +      val tx = getChannel.getTransaction
    +      tx.begin()
    +      try {
    +        eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet())
    +        val events = eventBatch.getEventBatch
    +        events.clear()
    +        val loop = new Breaks
    +        var gotEventsInThisTxn = false
    +        loop.breakable {
    +          var i = 0
    +          // Using for here causes the maxBatchSize change to be 
ineffective as the Range gets
    +          // pregenerated
    +          while (i < maxBatchSize) {
    +            i += 1
    +            val eventOpt = Option(getChannel.take())
    +   => {
    +              events.add(new SparkSinkEvent(toCharSequenceMap(event
    +                .getHeaders),
    +                ByteBuffer.wrap(event.getBody)))
    +              gotEventsInThisTxn = true
    +            })
    +            if (eventOpt.isEmpty) {
    +              if (!gotEventsInThisTxn) {
    +                // To avoid sending empty batches, we wait till events are 
available backing off
    +                // between attempts to get events. Each attempt to get an 
event though causes one
    +                // iteration to be lost. To ensure that we still send back 
maxBatchSize number of
    +                // events, we cheat and increase the maxBatchSize by 1 to 
account for the lost
    +                // iteration. Even throwing an exception is expensive as 
Avro will serialize it
    +                // and send it over the wire, which is useless. Before 
incrementing though,
    +                // ensure that we are not anywhere near INT_MAX.
    +                if (maxBatchSize >= Int.MaxValue / 2) {
    +                  // Random sanity check
    +                  throw new RuntimeException("Safety exception - polled 
too many times, no events!")
    +                }
    +                maxBatchSize += 1
    +                Thread.sleep(500)
    +              } else {
    +                loop.break()
    +              }
    +            }
    +          }
    +        }
    +        // Make the data available to the sender thread
    +        eventQueue.put(eventBatch)
    +        // Wait till timeout for the ack/nack
    +        val maybeResult = Option(resultQueue.poll(transactionTimeout, 
    +        // There is a race condition here.
    +        // 1. This times out.
    +        // 2. The result is empty, so timeout exception is thrown.
    +        // 3. The ack comes in before the finally block is entered
    +        // 4. The thread with the ack has a handle to this processor,
    +        // and another thread has the same processor checked out
    +        // (since the finally block was executed and the processor checked 
back in)
    +        // 5. The thread with the ack now updates the result queue,
    +        // so the processor thinks it is the ack for the current batch.
    +        // To avoid this - update the sequence number to "0" (with or 
without a result - does not
    +        // matter).
    +        // In the ack method, check if the seq number is the same as the 
processor's -
    +        // if they are then update the result queue. Now if the
    +        // processor updates the seq number first - the ack/nack never 
updates the result. If the
    +        // ack/nack updates the
    +        // result after the timeout but before the seq number is updated 
to "0" it does not
    +        // matter - the processor would
    +        // still timeout and the result is cleared before reusing the 
    +        // Unfortunately, this needs to be done from within a lock
    +        // to make sure that the new sequence number is actually visible 
to the ack thread
    +        // (happens-before)
    +        resultQueueUpdateLock.lock()
    +        try {
    +          eventBatch.setSequenceNumber(
    +        } finally {
    +          resultQueueUpdateLock.unlock()
    +        }
    +        eventBatch.getEventBatch.clear()
    +        // If the batch failed on spark side, throw a FlumeException
    + =>
    +          if (!success) {
    +            throw new
    +                FlumeException("Spark could not accept events. The 
transaction will be retried.")
    +          }
    +        )
    +        // If the operation timed out, throw a TimeoutException
    +        if (maybeResult.isEmpty) {
    +          throw new TimeoutException("Spark did not respond within the 
timeout period of " +
    +            transactionTimeout + "seconds. Transaction will be retried")
    +        }
    +        null
    +      } catch {
    +        case e: Throwable =>
    +          try {
    +            LOG.warn("Error while attempting to remove events from the 
channel.", e)
    +            tx.rollback()
    +          } catch {
    +            case e1: Throwable => LOG.error(
    +              "Rollback failed while attempting to rollback due to commit 
failure.", e1)
    +          }
    +          null // No point rethrowing the exception
    +      } finally {
    +        // Must *always* release the caller thread
    +        eventQueue.put(ErrorEventBatch)
    +        // In the case of success coming after the timeout, but before 
resetting the seq number
    +        // remove the event from the map and then clear the value
    +        resultQueue.clear()
    +        processorMap.remove(eventBatch.getSequenceNumber)
    +        processorFactory.get.checkIn(this)
    +        tx.close()
    +      }
    +    }
    +    def toCharSequenceMap(inMap: java.util.Map[String, String]): 
    +      CharSequence] = {
    +      val charSeqMap = new util.HashMap[CharSequence, 
    +      charSeqMap.putAll(inMap)
    +      charSeqMap
    +    }
    +  }
    +  private class SparkHandlerFactory(val maxInstances: Int) {
    +    val queue = new scala.collection.mutable.Queue[TransactionProcessor]
    +    val queueModificationLock = new ReentrantLock()
    +    var currentSize = 0
    +    val waitForCheckIn = queueModificationLock.newCondition()
    +    def checkOut(n: Int): TransactionProcessor = {
    +      def getProcessor = {
    +        val processor = queue.dequeue()
    +        processor.maxBatchSize = n
    +        processor
    +      }
    +      queueModificationLock.lock()
    +      try {
    +        if (queue.size > 0) {
    +          getProcessor
    +        }
    +        else {
    +          if (currentSize < maxInstances) {
    +            currentSize += 1
    +            new TransactionProcessor(n)
    +          } else {
    +            // No events in queue and cannot initialize more!
    +            // Since currentSize never reduces, queue size increasing is 
the only hope
    +            while (queue.size == 0 && currentSize >= maxInstances) {
    +              waitForCheckIn.await()
    +            }
    +            getProcessor
    +          }
    +        }
    +      } finally {
    +        queueModificationLock.unlock()
    +      }
    +    }
    +    def checkIn(processor: TransactionProcessor) {
    +      queueModificationLock.lock()
    Hmm, let me think about how we can get rid of the lock - since we are 
initializing processors lazily, we want to ensure that total still does not go 
over maxInstances.

