Hi Gonzalo,

Thank you very much.  And I have another question:

Here is my metrics, It seems no data lost, right?

{
  "SOURCE.KafkaSource": {
    "KafkaEventGetTimer": "11916336",
    "Type": "SOURCE",
    "EventAcceptedCount": "28938986",
    "AppendReceivedCount": "0",
    "EventReceivedCount": "28938986",
    "KafkaCommitTimer": "0",
    "KafkaEmptyCount": "0",
    "OpenConnectionCount": "0",
    "AppendBatchReceivedCount": "0",
    "AppendBatchAcceptedCount": "0",
    "StopTime": "0",
    "StartTime": "1449463617288",
    "AppendAcceptedCount": "0"
  },
  "SINK.k4": {
    "ConnectionFailedCount": "0",
    "BatchCompleteCount": "0",
    "EventDrainAttemptCount": "9",
    "ConnectionCreatedCount": "1",
    "Type": "SINK",
    "BatchEmptyCount": "823",
    "ConnectionClosedCount": "0",
    "EventDrainSuccessCount": "9",
    "StopTime": "0",
    "StartTime": "1449463610232",
    "BatchUnderflowCount": "9"
  },
  "CHANNEL.c1": {
    "EventPutSuccessCount": "28938977",
    "ChannelFillPercentage": "4.885",
    "Type": "CHANNEL",
    "EventPutAttemptCount": "28938977",
    "ChannelSize": "977",
    "StopTime": "0",
    "StartTime": "1449463610228",
    "EventTakeSuccessCount": "28938000",
    "ChannelCapacity": "20000",
    "EventTakeAttemptCount": "28938979"
  },
  "SINK.k1": {
    "ConnectionFailedCount": "0",
    "BatchCompleteCount": "28938",
    "EventDrainAttemptCount": "28938977",
    "ConnectionCreatedCount": "167",
    "Type": "SINK",
    "BatchEmptyCount": "1",
    "ConnectionClosedCount": "159",
    "EventDrainSuccessCount": "28938000",
    "StopTime": "0",
    "StartTime": "1449463610233",
    "BatchUnderflowCount": "0"
  },
  "CHANNEL.c2": {
    "EventPutSuccessCount": "9",
    "ChannelFillPercentage": "0.0",
    "Type": "CHANNEL",
    "EventPutAttemptCount": "9",
    "ChannelSize": "0",
    "StopTime": "0",
    "StartTime": "1449463610228",
    "EventTakeSuccessCount": "9",
    "ChannelCapacity": "3000",
    "EventTakeAttemptCount": "841"
  }
}


The pipeline: KafkaSource -> c1 -> k1
                     KafkaSource  -> c2 -> k4

But I don’t know why the counter name not match with the source code? For 
example:
"EventReceivedCount": "28938986",
And in KafKaSource.java
line 127: counter.addToEventReceivedCount(Long.valueOf(eventList.size()));


it calls SourceCounter.java
public long addToEventReceivedCount(long delta) {
  return addAndGet(COUNTER_EVENTS_RECEIVED, delta);
}
The COUNTER_EVENTS_RECEIVED defined in SourceCounter.java as below:
private static final String COUNTER_EVENTS_RECEIVED =
    "src.events.received";


So how the counter name is changed from source code to monitor metrics?

Thanks.




> On 7 Dec, 2015, at 5:37 pm, Gonzalo Herreros <[email protected]> wrote:
> 
> Kafka consumers keep track of the progress made (offset) in Zookeeper (there 
> is an option in the newer versions to change that but Flume still uses the 
> old way).
> What happens there is that when the consumer tries to get messages from the 
> offset it knows but Kafka comes back saying that offset is not present and it 
> requests the client to "reset" the offset (not sure if it resets to the 
> oldest or newest).
> 
> That might indicate either a conflict because some other kafka cluster is 
> using the same Zookeeper and groupId, or that the kafka retention is so low 
> that messages in the queue get deleted before they can be processed. (A third 
> option is that the Kafka cluster is completely messed up).
> 
> In my view, this is a Kafka issue and not Flume's, try to troubleshoot Kafka 
> first.
> When you say "messages lost in the flume pipeline", do you mean that error 
> you are getting or you have some other issue?
> 
> 
> On 7 December 2015 at 09:02, Zhishan Li <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Gonzalo,
> 
> Thanks for your reply. But I still can not figure out why the value of offset 
> is frequently reset. I am sure, the groupId is unique for my kafka cluster.
> 
> I find that some messages lost in flume pipeline.  But I don’t know the 
> reason. Please do me a favour. 
> 
> Thanks,
> 
> 
> 
>> On 7 Dec, 2015, at 4:06 pm, Gonzalo Herreros <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> What that means is that the KafkaSource is trying to read messages from the 
>> last time it was running (or at least the last time some client used kafka 
>> with the same groupId) but they have been already deleted by Kafka so is 
>> working you that there are messages that have been missed.
>> Even if is the first time you use the KafkaSource, maybe somebody used a 
>> Kafka consumer with the same groupId long ago. It's better if you make up 
>> your own groupId so you don't have strange conflicts.
>> 
>> Regards,
>> Gonzalo
>> 
>> 
>> On 7 December 2015 at 04:37, Zhishan Li <[email protected] 
>> <mailto:[email protected]>> wrote:
>> When I use KafkaSource, the following error is raised:
>> 
>>      07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-] 
>> (kafka.utils.Logging$class.error:97)  - Current offset 482243452 for 
>> partition [5] out of range; reset offset to 483146676      
>>      Current offset 482243452 for partition [log,5] out of range; reset 
>> offset to 483146676
>>      consumed offset: 482243452 doesn't match fetch offset: 483146676 for 
>> log:5: fetched offset = 483147611: consumed offset = 482243452;
>>      Consumer may lose data
>> 
>> But the default configuration of KafkaSource is used. 
>> 
>> What happens during the agent running?
>> 
>> Thanks
>> 
>> 
> 
> 

Reply via email to