[
https://issues.apache.org/jira/browse/FLUME-2956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290370#comment-16290370
]
He Jiang edited comment on FLUME-2956 at 12/14/17 5:51 AM:
-----------------------------------------------------------
Sure. Basically the original code (HiveSink.java, line 314) only send heartbeat
on activeWriters, which means a collection of writers that just consume some
event. My modification give the inactive writers a chance to send heartbeat if
it's needed.
Here's the original code piece (HiveSink.java, line 314):
// 5) Flush all Writers
for (HiveWriter writer : activeWriters.values()) {
writer.flush(true);
}
And Here's the modified version:
// 5) Flush all Writers
for (HiveWriter writer : allWriters.values()) {
if (activeWriters.values().contains(writer)) {
writer.flush(true);
} else {
writer.sendHeartbeatIfNeeded();
}
}
Please see attachment for the modified jar or get it here:
[^flume-hive-sink-1.8.0.jar]
was (Author: hejiang):
Sure. Basically the original code (HiveSink.java, line 314) only send heartbeat
on activeWriters, which means a collection of writers that just consume some
event. My modification give the inactive writers a chance to send heartbeat if
it's needed.
Here's the original code piece (HiveSink.java, line 314):
// 5) Flush all Writers
for (HiveWriter writer : activeWriters.values()) {
writer.flush(true);
}
And Here's the modified version:
// 5) Flush all Writers
for (HiveWriter writer : allWriters.values()) {
if (activeWriters.values().contains(writer)) {
writer.flush(true);
} else {
writer.sendHeartbeatIfNeeded();
}
}
Please see attachment for the modified jar.
> hive sink not sending heartbeat correctly
> -----------------------------------------
>
> Key: FLUME-2956
> URL: https://issues.apache.org/jira/browse/FLUME-2956
> Project: Flume
> Issue Type: Bug
> Components: Sinks+Sources
> Affects Versions: 1.5.2
> Environment: linux CentOS 6.6
> Reporter: Olivier brobecker
> Attachments: flume-hive-sink-1.8.0.jar
>
>
> Flume has been configured in order to populate a hive table as follow :
> agentSCDR.sinks.hive1.type = hive
> agentSCDR.sinks.hive1.channel = channel1
> agentSCDR.sinks.hive1.hive.metastore = thrift://myserver:9083
> agentSCDR.sinks.hive1.hive.txnsPerBatchAsk = 10
> agentSCDR.sinks.hive1.hive.database = myDatabase
> agentSCDR.sinks.hive1.hive.table = my_table
> agentSCDR.sinks.hive1.hive.partition = %Y
> agentSCDR.sinks.hive1.heartBeatInterval = 60
> agentSCDR.sinks.hive1.serializer = DELIMITED
> agentSCDR.sinks.hive1.serializer.delimiter = ;
> agentSCDR.sinks.hive1.serializer.serdeSeparator = ;
> agentSCDR.sinks.hive1.serializer.fieldnames = field1,field2,field3...
> My dataflow is irregular at best and I can have more than 30 minutes of
> inactivity, so I have a heartbeatInterval at 60s in order to keep my txn
> alive.
> The issue is that this heartBeat is only sent when flume is trying to inject
> some datas into hive instead of having one every 60s.
> # grep -i heartbeat flume-agentSCDR.log
> 15 juil. 2016 13:40:43,008 INFO [hive-hive1-call-runner-0]
> (org.apache.flume.sink.hive.HiveWriter$2.call:238) - Sending heartbeat on
> batch TxnIds=[3755...3764] on endPoint = {metaStoreUri=...
> 15 juil. 2016 14:12:21,001 INFO [hive-hive1-call-runner-0]
> (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on
> batch TxnIds=[3785...3794] on endPoint = {metaStoreUri=...
> 15 juil. 2016 14:27:56,963 INFO [hive-hive1-call-runner-0]
> (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on
> batch TxnIds=[3795...3804] on endPoint = {metaStoreUri=...
> ...
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)