Hi, Do you mean this 5 second long gap: 2018-05-22 00:03:49,533 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.sequoiadb.SequoiaDBOneSink.process(SequoiaDBOneSink.java:341)] transaction close timestamp is :1526918629533 2018-05-22 00:03:54,538 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.sequoiadb.SequoiaDBOneSink.process(SequoiaDBOneSink.java:310)] transaction begin timestamp is :1526918634538 ?
This is the normal way how the polling thread works. The polling thread calls the process() method on the sink periodically. If there were no events to be processed, the sink implementations should return Status.BACKOFF (as your sink does). In this case the polling thread does not call the sink again immediately, but sleeps for a while. The sleeping periods are: 1, 2, 3, 4, 5, 5, 5, ... seconds. (if the consecutive sink calls returned backoff again). Based on the log ("txnEventCount is :0"), the sink did not processed any events, so the waiting periods seem normal. For further details please see the PollingRunner class in SinkRunner: https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java Regards, Peter Turcsanyi On Mon, May 21, 2018 at 5:43 PM, 基勇 <252637...@qq.com> wrote: > Hi,guys: > I use flume to read data written to sequoiadb, channel uses kafka > channel, sink is developed to flume sequoiadb sink itself, testing the time > to write sequoiadb, but transation close to begin even takes 3-5 seconds. > Why does the transaction take so long to close to open? > Is there a partner who can help me? 3Q > > code: > > @Override > > public Status process() throws EventDeliveryException { > > // TODO Auto-generated method stub > > Channel channel = getChannel(); > > Transaction transaction = channel.getTransaction(); > > LOG.info("transaction begin timestamp is :" > +System.currentTimeMillis()); > > transaction.begin(); > > boolean success = false; > > > > try { > > int txnEventCount = drainOne(channel); > > transaction.commit(); > > success = true; > > LOG.info("transaction commit timestamp is :" > +System.currentTimeMillis()); > > if (txnEventCount < 1) { > > return Status.BACKOFF; > > } else { > > return Status.READY; > > } > > > > } catch (BaseException e) { > > LOG.error(e.getMessage(),e); > > return Status.BACKOFF; > > } catch (InterruptedException e) { > > LOG.error(e.getMessage(),e); > > return Status.BACKOFF; > > } catch (Exception e){ > > throw new EventDeliveryException(e); > > }finally{ > > if (!success) { > > transaction.rollback(); > > } > > transaction.close(); > > LOG.info("transaction close timestamp is :" > +System.currentTimeMillis()); > > } > > } > >