I think there is a bug in the documentation that does not close the transaction in the sink. You should look at any one of the sinks in the code base and follow that in your own sink
On Sunday, November 22, 2015, beargan <[email protected]> wrote: > Sink > > > > *public* *class* *MySink* *extends* AbstractSink *implements* Configurable *{* > > *private* String myProp*;* > > > > @Override > > *public* *void* *configure**(*Context context*)* *{* > > String myProp *=* context*.*getString*(*"myProp"*,* "defaultValue"*);* > > > > *// Process the myProp value (e.g. validation)* > > > > *// Store myProp for later retrieval by process() method* > > *this**.*myProp *=* myProp*;* > > *}* > > > > @Override > > *public* *void* *start**()* *{* > > *// Initialize the connection to the external repository (e.g. HDFS) that* > > *// this Sink will forward Events to ..* > > *}* > > > > @Override > > *public* *void* *stop* *()* *{* > > *// Disconnect from the external respository and do any* > > *// additional cleanup (e.g. releasing resources or nulling-out* > > *// field values) ..* > > *}* > > > > @Override > > *public* Status *process**()* *throws* EventDeliveryException *{* > > Status status *=* *null**;* > > > > *// Start transaction* > > Channel ch *=* getChannel*();* > > Transaction txn *=* ch*.*getTransaction*();* > > txn*.*begin*();* > > *try* *{* > > *// This try clause includes whatever Channel operations you want to do* > > > > Event event *=* ch*.*take*();* > > > > *// Send the Event to the external repository.* > > *// storeSomeData(e);* > > > > txn*.*commit*();* > > status *=* Status*.*READY*;* > > *}* *catch* *(*Throwable t*)* *{* > > txn*.*rollback*();* > > > > *// Log exception, handle individual exceptions as needed* > > > > status *=* Status*.*BACKOFF*;* > > > > *// re-throw all Errors* > > *if* *(*t *instanceof* Error*)* *{* > > *throw* *(*Error*)*t*;* > > *}* > > *}* > > *return* status*;* > > *}* > > *}* > > > > *Compile and execute the following error:* > > > > 2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR - > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable > to deliver event. Exception follows. > > java.lang.IllegalStateException: begin() called when transaction is > COMPLETED! > > at > com.google.common.base.Preconditions.checkState(Preconditions.java:145) > > at > org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) > > at Toturial.Flume.MySink.process(MySink.java:54) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Unknown Source) > -- Thanks, Hari
