Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/807#discussion_r13466028
  
    --- Diff: 
external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala 
---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.flume.sink
    +
    +import org.apache.flume.sink.AbstractSink
    +import java.util.concurrent.locks.ReentrantLock
    +import org.apache.flume.Sink.Status
    +import org.apache.spark.flume.{SparkSinkEvent, EventBatch, 
SparkFlumeProtocol}
    +import scala.util.control.Breaks
    +import java.nio.ByteBuffer
    +import org.apache.flume.{FlumeException, Context}
    +import org.slf4j.LoggerFactory
    +import java.util.concurrent.atomic.AtomicLong
    +import org.apache.commons.lang.RandomStringUtils
    +import java.util.concurrent._
    +import java.util
    +import org.apache.flume.conf.{ConfigurationException, Configurable}
    +import com.google.common.util.concurrent.ThreadFactoryBuilder
    +import org.apache.avro.ipc.NettyServer
    +import org.apache.avro.ipc.specific.SpecificResponder
    +import java.net.InetSocketAddress
    +
    +class SparkSink() extends AbstractSink with Configurable {
    +  private val LOG = LoggerFactory.getLogger(this.getClass)
    +  private val lock = new ReentrantLock()
    +  private val blockingCondition = lock.newCondition()
    +
    +  // This sink will not persist sequence numbers and reuses them if it 
gets restarted.
    +  // So it is possible to commit a transaction which may have been meant 
for the sink before the
    +  // restart.
    +  // Since the new txn may not have the same sequence number we must guard 
against accidentally
    +  // committing
    +  // a new transaction. To reduce the probability of that happening a 
random string is prepended
    +  // to the sequence number.
    +  // Does not change for life of sink
    +  private val seqBase = RandomStringUtils.randomAlphanumeric(8)
    +  // Incremented for each transaction
    +  private val seqNum = new AtomicLong(0)
    +
    +  private var transactionExecutorOpt: Option[ExecutorService] = None
    +
    +  private var numProcessors: Integer = 
SparkSinkConfig.DEFAULT_PROCESSOR_COUNT
    +  private var transactionTimeout = 
SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
    +
    +  private val processorMap = new ConcurrentHashMap[CharSequence, 
TransactionProcessor]()
    +
    +  private var processorFactory: Option[SparkHandlerFactory] = None
    +  private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
    +  private var port: Int = 0
    +  private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS
    +  private var serverOpt: Option[NettyServer] = None
    +  private var running = false
    +
    +  override def start() {
    +    transactionExecutorOpt = 
Option(Executors.newFixedThreadPool(numProcessors,
    +      new ThreadFactoryBuilder().setDaemon(true)
    +        .setNameFormat("Spark Sink, " + getName + " Processor Thread - 
%d").build()))
    +
    +    processorFactory = Option(new SparkHandlerFactory(numProcessors))
    +
    +    val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new 
AvroCallbackHandler())
    +
    +    // Using the constructor that takes specific thread-pools requires 
bringing in netty
    +    // dependencies which are being excluded in the build. In practice,
    +    // Netty dependencies are already available on the JVM as Flume would 
have pulled them in.
    +    serverOpt = Option(new NettyServer(responder, new 
InetSocketAddress(hostname, port)))
    +
    +    serverOpt.map(server => server.start())
    +    lock.lock()
    +    try {
    +      running = true
    +    } finally {
    +      lock.unlock()
    +    }
    +    super.start()
    +  }
    +
    +  override def stop() {
    +    transactionExecutorOpt.map(executor => executor.shutdownNow())
    +    serverOpt.map(server => {
    +      server.close()
    +      server.join()
    +    })
    +    lock.lock()
    +    try {
    +      running = false
    +      blockingCondition.signalAll()
    +    } finally {
    +      lock.unlock()
    +    }
    +  }
    +
    +  override def configure(ctx: Context) {
    +    import SparkSinkConfig._
    +    hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
    +    val portOpt = Option(ctx.getInteger(CONF_PORT))
    +    if(portOpt.isDefined) {
    +      port = portOpt.get
    +    } else {
    +      throw new ConfigurationException("The Port to bind must be 
specified")
    +    }
    +    numProcessors = ctx.getInteger(PROCESSOR_COUNT, 
DEFAULT_PROCESSOR_COUNT)
    +    transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, 
DEFAULT_TRANSACTION_TIMEOUT)
    +    maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS)
    +  }
    +
    +  override def process(): Status = {
    +    // This method is called in a loop by the Flume framework - block it 
until the sink is
    +    // stopped to save CPU resources
    +    lock.lock()
    +    try {
    +      while(running) {
    +        blockingCondition.await()
    +      }
    +    } finally {
    +      lock.unlock()
    +    }
    +    Status.BACKOFF
    +  }
    +
    +
    +  // Object representing an empty batch returned by the txn processor due 
to some error.
    +  case object ErrorEventBatch extends EventBatch
    +
    +  private class AvroCallbackHandler() extends SparkFlumeProtocol {
    +
    +    override def getEventBatch(n: Int): EventBatch = {
    +      val processor = processorFactory.get.checkOut(n)
    +      transactionExecutorOpt.map(executor => executor.submit(processor))
    +      // Wait until a batch is available - can be null if some error was 
thrown
    +      val eventBatch = processor.eventQueue.take()
    +      eventBatch match {
    +        case ErrorEventBatch => throw new FlumeException("Something went 
wrong. No events" +
    +          " retrieved from channel.")
    +        case events => {
    +          processorMap.put(events.getSequenceNumber, processor)
    +          if (LOG.isDebugEnabled) {
    +            LOG.debug("Sent " + events.getEventBatch.size() +
    +              " events with sequence number: " + events.getSequenceNumber)
    +          }
    +          events
    +        }
    +      }
    +    }
    +
    +    override def ack(sequenceNumber: CharSequence): Void = {
    +      completeTransaction(sequenceNumber, success = true)
    +      null
    +    }
    +
    +    override def nack(sequenceNumber: CharSequence): Void = {
    +      completeTransaction(sequenceNumber, success = false)
    +      LOG.info("Spark failed to commit transaction. Will reattempt 
events.")
    +      null
    +    }
    +
    +    def completeTransaction(sequenceNumber: CharSequence, success: 
Boolean) {
    +      val processorOpt = Option(processorMap.remove(sequenceNumber))
    +      if (processorOpt.isDefined) {
    +        val processor = processorOpt.get
    +        processor.resultQueueUpdateLock.lock()
    +        try {
    +          // Is the sequence number the same as the one the processor is 
processing? If not,
    +          // don't update {
    +          if 
(processor.eventBatch.getSequenceNumber.equals(sequenceNumber)) {
    +            processor.resultQueue.put(success)
    +          }
    +        } finally {
    +          processor.resultQueueUpdateLock.unlock()
    +        }
    +      }
    +    }
    +  }
    +
    +  // Flume forces transactions to be thread-local (horrible, I know!)
    +  // So the sink basically spawns a new thread to pull the events out 
within a transaction.
    +  // The thread fills in the event batch object that is set before the 
thread is scheduled.
    +  // After filling it in, the thread waits on a condition - which is 
released only
    +  // when the success message comes back for the specific sequence number 
for that event batch.
    +  /**
    +   * This class represents a transaction on the Flume channel. This class 
runs a separate thread
    +   * which owns the transaction. It is blocked until the success call for 
that transaction comes
    +   * back.
    +   * @param maxBatchSize
    +   */
    +  private class TransactionProcessor(var maxBatchSize: Int) extends 
Callable[Void] {
    +    // Must be set to a new event batch before scheduling this!!
    +    val eventBatch = new EventBatch("", new 
util.LinkedList[SparkSinkEvent])
    +    val eventQueue = new SynchronousQueue[EventBatch]()
    +    val resultQueue = new SynchronousQueue[Boolean]()
    +    val resultQueueUpdateLock = new ReentrantLock()
    +
    +    object Zero {
    +      val zero = "0" // Oh, I miss static finals
    +    }
    +
    +
    +    override def call(): Void = {
    +      val tx = getChannel.getTransaction
    +      tx.begin()
    +      try {
    +        eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet())
    +        val events = eventBatch.getEventBatch
    +        events.clear()
    +        val loop = new Breaks
    +        var gotEventsInThisTxn = false
    +        loop.breakable {
    +          var i = 0
    +          // Using for here causes the maxBatchSize change to be 
ineffective as the Range gets
    +          // pregenerated
    +          while (i < maxBatchSize) {
    +            i += 1
    +            val eventOpt = Option(getChannel.take())
    +            eventOpt.map(event => {
    +              events.add(new SparkSinkEvent(toCharSequenceMap(event
    +                .getHeaders),
    +                ByteBuffer.wrap(event.getBody)))
    +              gotEventsInThisTxn = true
    +            })
    +            if (eventOpt.isEmpty) {
    +              if (!gotEventsInThisTxn) {
    +                // To avoid sending empty batches, we wait till events are 
available backing off
    +                // between attempts to get events. Each attempt to get an 
event though causes one
    +                // iteration to be lost. To ensure that we still send back 
maxBatchSize number of
    +                // events, we cheat and increase the maxBatchSize by 1 to 
account for the lost
    +                // iteration. Even throwing an exception is expensive as 
Avro will serialize it
    +                // and send it over the wire, which is useless. Before 
incrementing though,
    +                // ensure that we are not anywhere near INT_MAX.
    +                if (maxBatchSize >= Int.MaxValue / 2) {
    +                  // Random sanity check
    +                  throw new RuntimeException("Safety exception - polled 
too many times, no events!")
    +                }
    +                maxBatchSize += 1
    +                Thread.sleep(500)
    +              } else {
    +                loop.break()
    +              }
    +            }
    +          }
    +        }
    +        // Make the data available to the sender thread
    +        eventQueue.put(eventBatch)
    +
    +        // Wait till timeout for the ack/nack
    +        val maybeResult = Option(resultQueue.poll(transactionTimeout, 
TimeUnit.SECONDS))
    +        // There is a race condition here.
    +        // 1. This times out.
    +        // 2. The result is empty, so timeout exception is thrown.
    +        // 3. The ack comes in before the finally block is entered
    +        // 4. The thread with the ack has a handle to this processor,
    +        // and another thread has the same processor checked out
    +        // (since the finally block was executed and the processor checked 
back in)
    +        // 5. The thread with the ack now updates the result queue,
    +        // so the processor thinks it is the ack for the current batch.
    +        // To avoid this - update the sequence number to "0" (with or 
without a result - does not
    +        // matter).
    +        // In the ack method, check if the seq number is the same as the 
processor's -
    +        // if they are then update the result queue. Now if the
    +        // processor updates the seq number first - the ack/nack never 
updates the result. If the
    +        // ack/nack updates the
    +        // result after the timeout but before the seq number is updated 
to "0" it does not
    +        // matter - the processor would
    +        // still timeout and the result is cleared before reusing the 
processor.
    +        // Unfortunately, this needs to be done from within a lock
    +        // to make sure that the new sequence number is actually visible 
to the ack thread
    +        // (happens-before)
    +        resultQueueUpdateLock.lock()
    +        try {
    +          eventBatch.setSequenceNumber(Zero.zero)
    +        } finally {
    +          resultQueueUpdateLock.unlock()
    +        }
    +        eventBatch.getEventBatch.clear()
    +        // If the batch failed on spark side, throw a FlumeException
    +        maybeResult.map(success =>
    +          if (!success) {
    +            throw new
    +                FlumeException("Spark could not accept events. The 
transaction will be retried.")
    +          }
    +        )
    +        // If the operation timed out, throw a TimeoutException
    +        if (maybeResult.isEmpty) {
    +          throw new TimeoutException("Spark did not respond within the 
timeout period of " +
    +            transactionTimeout + "seconds. Transaction will be retried")
    +        }
    +        null
    +      } catch {
    +        case e: Throwable =>
    +          try {
    +            LOG.warn("Error while attempting to remove events from the 
channel.", e)
    +            tx.rollback()
    +          } catch {
    +            case e1: Throwable => LOG.error(
    +              "Rollback failed while attempting to rollback due to commit 
failure.", e1)
    +          }
    +          null // No point rethrowing the exception
    +      } finally {
    +        // Must *always* release the caller thread
    +        eventQueue.put(ErrorEventBatch)
    +        // In the case of success coming after the timeout, but before 
resetting the seq number
    +        // remove the event from the map and then clear the value
    +        resultQueue.clear()
    +        processorMap.remove(eventBatch.getSequenceNumber)
    +        processorFactory.get.checkIn(this)
    +        tx.close()
    +      }
    +    }
    +
    +    def toCharSequenceMap(inMap: java.util.Map[String, String]): 
java.util.Map[CharSequence,
    +      CharSequence] = {
    +      val charSeqMap = new util.HashMap[CharSequence, 
CharSequence](inMap.size())
    +      charSeqMap.putAll(inMap)
    +      charSeqMap
    +    }
    +  }
    +
    +  private class SparkHandlerFactory(val maxInstances: Int) {
    +    val queue = new scala.collection.mutable.Queue[TransactionProcessor]
    +    val queueModificationLock = new ReentrantLock()
    +    var currentSize = 0
    +    val waitForCheckIn = queueModificationLock.newCondition()
    +
    +    def checkOut(n: Int): TransactionProcessor = {
    +      def getProcessor = {
    +        val processor = queue.dequeue()
    +        processor.maxBatchSize = n
    +        processor
    +      }
    +      queueModificationLock.lock()
    +      try {
    +        if (queue.size > 0) {
    +          getProcessor
    +        }
    +        else {
    +          if (currentSize < maxInstances) {
    +            currentSize += 1
    +            new TransactionProcessor(n)
    +          } else {
    +            // No events in queue and cannot initialize more!
    +            // Since currentSize never reduces, queue size increasing is 
the only hope
    +            while (queue.size == 0 && currentSize >= maxInstances) {
    +              waitForCheckIn.await()
    +            }
    +            getProcessor
    +          }
    +        }
    +      } finally {
    +        queueModificationLock.unlock()
    +      }
    +    }
    +
    +    def checkIn(processor: TransactionProcessor) {
    +      queueModificationLock.lock()
    --- End diff --
    
    If the goal of the queue is to buffer items (i.e. `checkin` that can be 
picked up by a different thread (the one that calls 'checkout`), then its 
simpler to use something like a 
[ArrayBlockingQueue](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ArrayBlockingQueue.html)
 or 
[LinkedBlockingQueue](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html).
 No need for reentrant locks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to