Sink
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
}
Compile and execute the following error:
2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to
deliver event. Exception follows.
java.lang.IllegalStateException: begin() called when transaction is COMPLETED!
at
com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at
org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at Toturial.Flume.MySink.process(MySink.java:54)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Unknown Source)