Hi Gonzalo,
Thank you very much. And I have another question:
Here is my metrics, It seems no data lost, right?
{
"SOURCE.KafkaSource": {
"KafkaEventGetTimer": "11916336",
"Type": "SOURCE",
"EventAcceptedCount": "28938986",
"AppendReceivedCount": "0",
"EventReceivedCount": "28938986",
"KafkaCommitTimer": "0",
"KafkaEmptyCount": "0",
"OpenConnectionCount": "0",
"AppendBatchReceivedCount": "0",
"AppendBatchAcceptedCount": "0",
"StopTime": "0",
"StartTime": "1449463617288",
"AppendAcceptedCount": "0"
},
"SINK.k4": {
"ConnectionFailedCount": "0",
"BatchCompleteCount": "0",
"EventDrainAttemptCount": "9",
"ConnectionCreatedCount": "1",
"Type": "SINK",
"BatchEmptyCount": "823",
"ConnectionClosedCount": "0",
"EventDrainSuccessCount": "9",
"StopTime": "0",
"StartTime": "1449463610232",
"BatchUnderflowCount": "9"
},
"CHANNEL.c1": {
"EventPutSuccessCount": "28938977",
"ChannelFillPercentage": "4.885",
"Type": "CHANNEL",
"EventPutAttemptCount": "28938977",
"ChannelSize": "977",
"StopTime": "0",
"StartTime": "1449463610228",
"EventTakeSuccessCount": "28938000",
"ChannelCapacity": "20000",
"EventTakeAttemptCount": "28938979"
},
"SINK.k1": {
"ConnectionFailedCount": "0",
"BatchCompleteCount": "28938",
"EventDrainAttemptCount": "28938977",
"ConnectionCreatedCount": "167",
"Type": "SINK",
"BatchEmptyCount": "1",
"ConnectionClosedCount": "159",
"EventDrainSuccessCount": "28938000",
"StopTime": "0",
"StartTime": "1449463610233",
"BatchUnderflowCount": "0"
},
"CHANNEL.c2": {
"EventPutSuccessCount": "9",
"ChannelFillPercentage": "0.0",
"Type": "CHANNEL",
"EventPutAttemptCount": "9",
"ChannelSize": "0",
"StopTime": "0",
"StartTime": "1449463610228",
"EventTakeSuccessCount": "9",
"ChannelCapacity": "3000",
"EventTakeAttemptCount": "841"
}
}
The pipeline: KafkaSource -> c1 -> k1
KafkaSource -> c2 -> k4
But I don’t know why the counter name not match with the source code? For
example:
"EventReceivedCount": "28938986",
And in KafKaSource.java
line 127: counter.addToEventReceivedCount(Long.valueOf(eventList.size()));
it calls SourceCounter.java
public long addToEventReceivedCount(long delta) {
return addAndGet(COUNTER_EVENTS_RECEIVED, delta);
}
The COUNTER_EVENTS_RECEIVED defined in SourceCounter.java as below:
private static final String COUNTER_EVENTS_RECEIVED =
"src.events.received";
So how the counter name is changed from source code to monitor metrics?
Thanks.
> On 7 Dec, 2015, at 5:37 pm, Gonzalo Herreros <[email protected]> wrote:
>
> Kafka consumers keep track of the progress made (offset) in Zookeeper (there
> is an option in the newer versions to change that but Flume still uses the
> old way).
> What happens there is that when the consumer tries to get messages from the
> offset it knows but Kafka comes back saying that offset is not present and it
> requests the client to "reset" the offset (not sure if it resets to the
> oldest or newest).
>
> That might indicate either a conflict because some other kafka cluster is
> using the same Zookeeper and groupId, or that the kafka retention is so low
> that messages in the queue get deleted before they can be processed. (A third
> option is that the Kafka cluster is completely messed up).
>
> In my view, this is a Kafka issue and not Flume's, try to troubleshoot Kafka
> first.
> When you say "messages lost in the flume pipeline", do you mean that error
> you are getting or you have some other issue?
>
>
> On 7 December 2015 at 09:02, Zhishan Li <[email protected]
> <mailto:[email protected]>> wrote:
> Hi Gonzalo,
>
> Thanks for your reply. But I still can not figure out why the value of offset
> is frequently reset. I am sure, the groupId is unique for my kafka cluster.
>
> I find that some messages lost in flume pipeline. But I don’t know the
> reason. Please do me a favour.
>
> Thanks,
>
>
>
>> On 7 Dec, 2015, at 4:06 pm, Gonzalo Herreros <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> What that means is that the KafkaSource is trying to read messages from the
>> last time it was running (or at least the last time some client used kafka
>> with the same groupId) but they have been already deleted by Kafka so is
>> working you that there are messages that have been missed.
>> Even if is the first time you use the KafkaSource, maybe somebody used a
>> Kafka consumer with the same groupId long ago. It's better if you make up
>> your own groupId so you don't have strange conflicts.
>>
>> Regards,
>> Gonzalo
>>
>>
>> On 7 December 2015 at 04:37, Zhishan Li <[email protected]
>> <mailto:[email protected]>> wrote:
>> When I use KafkaSource, the following error is raised:
>>
>> 07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-]
>> (kafka.utils.Logging$class.error:97) - Current offset 482243452 for
>> partition [5] out of range; reset offset to 483146676
>> Current offset 482243452 for partition [log,5] out of range; reset
>> offset to 483146676
>> consumed offset: 482243452 doesn't match fetch offset: 483146676 for
>> log:5: fetched offset = 483147611: consumed offset = 482243452;
>> Consumer may lose data
>>
>> But the default configuration of KafkaSource is used.
>>
>> What happens during the agent running?
>>
>> Thanks
>>
>>
>
>