[jira] [Created] (FLUME-3190) flume shutdown hook issue when both hbase and hdfs sink are in use
Yuexin Zhang created FLUME-3190: --- Summary: flume shutdown hook issue when both hbase and hdfs sink are in use Key: FLUME-3190 URL: https://issues.apache.org/jira/browse/FLUME-3190 Project: Flume Issue Type: Bug Affects Versions: 1.6.0 Reporter: Yuexin Zhang Priority: Major When both hdfs and hbase sink are in use, during shutdown (KILL SIGTERM), the hdfs sink won't be able to rename/close the .tmp hdfs file because the underlying filesystem could be closed earlier when shutting down the other component: {code:java} 2017/10/23 15:34:50,858 ERROR (AbstractHDFSWriter.hflushOrSync:268) - Error while trying to hflushOrSync! 2017/10/23 15:34:50,859 WARN (BucketWriter.close:400) - failed to close() HDFSWriter for file (/tmp/bothSource/FlumeData.1508744083526.tmp). Exception follows. java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:860) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2388) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:2334) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync(AbstractHDFSWriter.java:265) at org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:134) at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:327) at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:323) at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701) at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50) at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} the root cause is HBase client's DynamicClassLoader injection (See DynamicClassLoader.java in HBase). HBase added a feature at some point to load JARs from HDFS dynamically into its class loader, and to do this it loads a DistributedFileSystem object via the standard FileSystem.get(…) / equivalent call. Flume, OTOH, in its HDFS BucketWriter, uses FileSystem.get(…) too (all a single instance, coming from the cache), but supplies an instruction that disables automatic-close at shutdown (Look for fs.automatic.close in BucketWriter.java). When HBase sink is active, HBase shares the FileSystem object indirectly for its internal/implicit DynamicClassLoader object, but this is grabbed from the cache without specifying 'do not auto-close at shutdown' cause HBase is not really troubled by that. However, since the same FileSystem object instance is now shared by something that wants it to auto-close and something that does not, the shutdown causes a problem in Flume. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225009#comment-16225009 ] Will Zhang commented on FLUME-3149: --- Hi [~bessbd], Thanks for your comments. I've made a few changes and comments accordingly. Hopefully it can be reviewed by more. I believe it's a valuable improvement. Best, Yifeng > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang >Assignee: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180281#comment-16180281 ] Will Zhang commented on FLUME-3149: --- Hi [~bessbd], I'll update the user guide and hopefully any one can review this PR. Thank you. Best, Will > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang >Assignee: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177665#comment-16177665 ] Will Zhang commented on FLUME-3149: --- Hi [~bessbd], It's been a while since I made the PR, so I wonder whether you or anyone possibly got any time to review this patch? Thank you very much. Best, Will > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang >Assignee: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Will Zhang reassigned FLUME-3149: - Assignee: Will Zhang > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang >Assignee: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16161203#comment-16161203 ] Will Zhang commented on FLUME-3149: --- Hi [~bessbd], I've appended a commit. Please review. Also, it seems I need permission to assign the issue to myself so that every commit can be show here? Thank you. Best, Will > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158975#comment-16158975 ] Will Zhang edited comment on FLUME-3149 at 9/8/17 5:29 PM: --- Hi [~bessbd], I didn't quite get how the "persisted" flag could be implemented. In this file transfer scenario, the source won't know about how to "safely" committed unless the channel told the source at the exact moment when the events are taken and committed from the channel. In my understanding, the event is only "safely commited" when sink consumes it. And as far as I know, a callback to the source is necessary in the commit() method of channel transaction. So as you mentioned, we could implement a new event. I came up with an idea that we can create a new Event class like "NotificationEvent" which extends the SimpleEvent and contains a callback function. Generally, in the commit method of the memory channel, it will check whether the takeList is not null and whether the event is Instance of NotificationEvent.class. If so, it will call the callback function of every event which is defined in the event and going to use the header info to update pos info in Taildir. At last of the commit method, it will call perhaps another callback method in the event which is just going to write the updated pos info in taildir to disk. This way, changes are mainly on the taildir side and changes on the channel side is small which only adds a branch of checking takeList and call back in the commit method. I had a simple test and it works but I'm not sure whether it meets the requirement of "separate the channel and source". What do you think? Thank you. was (Author: zyfo2): Hi [~bessbd], I didn't quite get how the "persisted" flag could be implemented. In this file transfer scenario, the source won't know about how to "safely" committed unless the channel told the source at the exact moment when the events are taken and committed from the channel. In my understanding, the event is only "safely commited" when sink consumes it. And as far as I know, a callback to the source is necessary in the commit() method of channel transaction. So as you mentioned, we could implement a new event. I came up with an idea that we can create a new Event class like "NotificationEvent" which extends the SimpleEvent and contains a callback function. Generally, in the commit method of the memory channel, it will check whether the takeList is not null and whether the event is Instance of NotificationEvent.class. If so, it will call the callback function of every event which is defined in the event and going to use the header info to update pos info in Taildir. At last of the commit method, it will call perhaps another callback method in the event which is just going to write the updated pos info to disk. This way, changes are mainly on the taildir side and changes on the channel side is small which only adds a branch of checking takeList and call back in the commit method. I had a simple test and it works but I'm not sure whether it meets the requirement of "separate the channel and source". What do you think? Thank you. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158975#comment-16158975 ] Will Zhang commented on FLUME-3149: --- Hi [~bessbd], I didn't quite get how the "persisted" flag could be implemented. In this file transfer scenario, the source won't know about how to "safely" committed unless the channel told the source at the exact moment when the events are taken and committed from the channel. In my understanding, the event is only "safely commited" when sink consumes it. And as far as I know, a callback to the source is necessary in the commit() method of channel transaction. So as you mentioned, we could implement a new event. I came up with an idea that we can create a new Event class like "NotificationEvent" which extends the SimpleEvent and contains a callback function. Generally, in the commit method of the memory channel, it will check whether the takeList is not null and whether the event is Instance of NotificationEvent.class. If so, it will call the callback function of every event which is defined in the event and going to use the header info to update pos info in Taildir. At last of the commit method, it will call perhaps another callback method in the event which is just going to write the updated pos info to disk. This way, changes are mainly on the taildir side and changes on the channel side is small which only adds a branch of checking takeList and call back in the commit method. I had a simple test and it works but I'm not sure whether it meets the requirement of "separate the channel and source". What do you think? Thank you. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16151773#comment-16151773 ] Will Zhang commented on FLUME-3149: --- Hi [~fszabo], A PollableSourceChannel sounds like a good idea but I got some confusions about it. Do you mean a channel which implements both the PollableSource and BasicChannelSemantics interfaces? If so, how does the PollableSource process() method work? For now, it puts events into the channel. As you mentioned, this PollableSourceChannel has no buffer so I suppose it doesn't need a put method and only reads events and keeps track of them when taken in a transaction? And [~bessbd], could you clarify the idea about "a new Event implementation which has a "persisted" flag or callback"? I suppose a event with callback is something you can callback to the source when events are consumed and committed in the channel so that we can have a actual position record. If so, this seems to be a clean way to go. Also, what does the "persisted" flag mean? Please correct me if I got anything wrong. Thank you. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: Will Zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144789#comment-16144789 ] will zhang commented on FLUME-3149: --- Hi [~bessbd], Thank you for your clarification. Right now, I can't think of a way to separate them cause the key of my idea is to let the channel inform the source when to actually commit. However, the current design of flume to decouple them. I can move the channel's part of "writePos" to source side but the source still needs to know when the channel commits. I'll keep working on it and update if any progress made. Thank you. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16142822#comment-16142822 ] will zhang commented on FLUME-3149: --- Hi [~fszabo], 1. Some of our use cases don't include kafka, some may only use hdfs sink, so kafka channel is not an option for general purpose. 2. I tried increase batch size (from 500 to 200,000) and use buffered output stream in file channel at the first place, however, it helped a little but not good enough still due to system calls like seek/read/write, which I analyzed by visualvm. For 1 Mb/s file transfer using file channel, the cpu cost is still more than 10% overall (before increasing batch size, it's about 13%), while using memory channel cost only about 3% at the same transfer rate. 3. IMO, file transfer is a very common use case, however, none of the channels is suitable enough to achieve both high performance and low cost. And reliability is often of critical importance in production. So file channel seem to be the only option. But we actually don't really need to store the events again in files since they come from local files originally. So maybe a separate channel specifically for file transfer is a better choice? Please correct me if I got something wrong. Thank you. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] will zhang updated FLUME-3149: -- Comment: was deleted (was: Btw, just to mention it, for sinks like hdfs when using taildir, we can pass offsets without actual message body to sink side to take advantage of zeroCopy to further reduce CPU cost, which is probably another issue.) > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16142711#comment-16142711 ] will zhang commented on FLUME-3149: --- Btw, just to mention it, for sinks like hdfs when using taildir, we can pass offsets without actual message body to sink side to take advantage of zeroCopy to further reduce CPU cost, which is probably another issue. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16142706#comment-16142706 ] will zhang commented on FLUME-3149: --- Hi [~bessbd], I have made a PR. Will you kindly review it? Thank you. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] will zhang updated FLUME-3149: -- Comment: was deleted (was: pr) > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137790#comment-16137790 ] will zhang commented on FLUME-3149: --- Hi [~bessbd], Thanks for the reply. I'll patch it soon. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136600#comment-16136600 ] will zhang commented on FLUME-3149: --- Can anyone please help review this issue? Thanks. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
[ https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] will zhang updated FLUME-3149: -- Description: File channel tracks transferred events and use transnational mechanism to make transfer recoverable. However, it increases CPU cost due to frequent system calls like write, read, etc. The Cpu cost could be very high if the transfer rate is high. In contrast, Memory channel has no such issue which requires only about 10% of CPU cost in the same environment but it's not recovered if the system is down accidentally. For sources like taildir/spooldir, I propose we could track offsets of file and store them locally to achieve reliability while still using memory channel to reduce CPU cost. Actually, I have already implemented this feature by storing the offsets in event headers and passing it to my own "offsetMemoryChannel" and store theses offsets in local disk in our production which reduces CPU cost by about 90 percent. Please let me know if it's worthwhile to have this feature in community version. Thank you. was: File channel tracks transferred events and use transnational mechanism to make transfer recoverable. However, it increases CPU cost due to frequent system calls like write, read, etc. The Cpu cost could be very high if the transfer rate is high. In contrast, Memory channel has no such issue which requires only about 10% of CPU cost in the same environment but it's not recovered if the system is down accidentally. For sources like taildir/spooldir, I propose we could track offsets of file and store them locally to achieve reliability while still using memory channel to reduce CPU cost. Actually, I have already implemented this feature in our production which reduces CPU cost by about 90 percent. Please let me know if it's worthwhile to have this feature in community. Thank you. > reduce cpu cost for file source transfer while still maintaining reliability > > > Key: FLUME-3149 > URL: https://issues.apache.org/jira/browse/FLUME-3149 > Project: Flume > Issue Type: Improvement > Components: File Channel >Reporter: will zhang > > File channel tracks transferred events and use transnational mechanism to > make transfer recoverable. However, it increases CPU cost due to frequent > system calls like write, read, etc. The Cpu cost could be very high if the > transfer rate is high. In contrast, Memory channel has no such issue which > requires only about 10% of CPU cost in the same environment but it's not > recovered if the system is down accidentally. > For sources like taildir/spooldir, I propose we could track offsets of file > and store them locally to achieve reliability while still using memory > channel to reduce CPU cost. Actually, I have already implemented this feature > by storing the offsets in event headers and passing it to my own > "offsetMemoryChannel" and store theses offsets in local disk in our > production which reduces CPU cost by about 90 percent. > Please let me know if it's worthwhile to have this feature in community > version. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability
will zhang created FLUME-3149: - Summary: reduce cpu cost for file source transfer while still maintaining reliability Key: FLUME-3149 URL: https://issues.apache.org/jira/browse/FLUME-3149 Project: Flume Issue Type: Improvement Components: File Channel Reporter: will zhang File channel tracks transferred events and use transnational mechanism to make transfer recoverable. However, it increases CPU cost due to frequent system calls like write, read, etc. The Cpu cost could be very high if the transfer rate is high. In contrast, Memory channel has no such issue which requires only about 10% of CPU cost in the same environment but it's not recovered if the system is down accidentally. For sources like taildir/spooldir, I propose we could track offsets of file and store them locally to achieve reliability while still using memory channel to reduce CPU cost. Actually, I have already implemented this feature in our production which reduces CPU cost by about 90 percent. Please let me know if it's worthwhile to have this feature in community. Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data
[ https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055351#comment-16055351 ] Yongxi Zhang commented on FLUME-3106: - [~niklaus.xiao],hi,niklaus,can you have a detailed explain for me?I have thought for a long time, but I still don't understand it.I think it is reasonable to ignores the exception and rerun null when takeList of memory channel is full. > When batchSize of sink greater than transactionCapacity of Memory Channel, > Flume can produce endless data > - > > Key: FLUME-3106 > URL: https://issues.apache.org/jira/browse/FLUME-3106 > Project: Flume > Issue Type: Bug > Components: Channel >Affects Versions: 1.7.0 >Reporter: Yongxi Zhang > Fix For: 1.8.0 > > Attachments: FLUME-3106-0.patch > > > Flume can produce endless data when use this following config: > {code:xml} > agent.sources = src1 > agent.sinks = sink1 > agent.channels = ch2 > agent.sources.src1.type = spooldir > agent.sources.src1.channels = ch2 > agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir > agent.sources.src1.fileHeader = false > agent.sources.src1.batchSize = 5 > agent.channels.ch2.type=memory > agent.channels.ch2.capacity=100 > agent.channels.ch2.transactionCapacity=5 > agent.sinks.sink1.type = hdfs > agent.sinks.sink1.channel = ch2 > agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ > agent.sinks.sink1.hdfs.rollInterval=1 > agent.sinks.sink1.hdfs.fileType = DataStream > agent.sinks.sink1.hdfs.writeFormat = Text > agent.sinks.sink1.hdfs.batchSize = 10 > {code} > And there are Exceptions like this: > {code:xml} > org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity > 5 full, consider committing more frequently, increasing capaci > ty, or increasing thread count > at > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) > at > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > at > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception > follows. > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: > Take list for MemoryTransaction, capacity 5 full, consider comm > itting more frequently, increasing capacity, or increasing thread count > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > {code} > When takeList of Memory Channel is full,there is a ChannelException will be > throwed,The event of takeList has been writed by the sink and roll back to > the queue of memoryChannel at the same time,it is not reasonable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLUME-3107) When batchSize of sink greater than transactionCapacity of File Channel, Flume can produce endless data
[ https://issues.apache.org/jira/browse/FLUME-3107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yongxi Zhang updated FLUME-3107: Attachment: FLUME-3107-0.patch > When batchSize of sink greater than transactionCapacity of File Channel, > Flume can produce endless data > --- > > Key: FLUME-3107 > URL: https://issues.apache.org/jira/browse/FLUME-3107 > Project: Flume > Issue Type: Bug > Components: File Channel >Affects Versions: 1.7.0 >Reporter: Yongxi Zhang > Fix For: 1.8.0 > > Attachments: FLUME-3107-0.patch > > > This problem is the similar as it in FLUME-3106.Flume can produce endless > data When batchSize of sink greater than transactionCapacity of File Channel, > you can try it with the following config: > {code:xml} > agent.sources = src1 > agent.sinks = sink1 > agent.channels = ch2 > agent.sources.src1.type = spooldir > agent.sources.src1.channels = ch2 > agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir > agent.sources.src1.fileHeader = false > agent.sources.src1.batchSize = 5 > agent.channels.ch2.type=file > agent.channels.ch2.capacity=100 > agent.channels.ch2.checkpointDir=/home/kafka/flumefilechannel/checkpointDir > agent.channels.ch2.dataDirs=/home/kafka/flumefilechannel/dataDirs > agent.channels.ch2.transactionCapacity=5 > agent.sinks.sink1.type = hdfs > agent.sinks.sink1.channel = ch2 > agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ > agent.sinks.sink1.hdfs.rollInterval=1 > agent.sinks.sink1.hdfs.fileType = DataStream > agent.sinks.sink1.hdfs.writeFormat = Text > agent.sinks.sink1.hdfs.batchSize = 10 > {code} > Exceptions like this: > {code:xml} > 17/06/09 17:16:18 ERROR flume.SinkRunner: Unable to deliver event. Exception > follows. > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: > Take list for FileBackedTransaction, capacity 5 full, consider > committing more frequently, increasing capacity, or increasing thread count. > [channel=ch2] > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flume.ChannelException: Take list for > FileBackedTransaction, capacity 5 full, consider committing more frequently, > in > creasing capacity, or increasing thread count. [channel=ch2] > at > org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:531) > at > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > at > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) > ... 3 more > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLUME-3107) When batchSize of sink greater than transactionCapacity of File Channel, Flume can produce endless data
Yongxi Zhang created FLUME-3107: --- Summary: When batchSize of sink greater than transactionCapacity of File Channel, Flume can produce endless data Key: FLUME-3107 URL: https://issues.apache.org/jira/browse/FLUME-3107 Project: Flume Issue Type: Bug Components: File Channel Affects Versions: 1.7.0 Reporter: Yongxi Zhang Fix For: 1.8.0 This problem is the similar as it in FLUME-3106.Flume can produce endless data When batchSize of sink greater than transactionCapacity of File Channel, you can try it with the following config: {code:xml} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=file agent.channels.ch2.capacity=100 agent.channels.ch2.checkpointDir=/home/kafka/flumefilechannel/checkpointDir agent.channels.ch2.dataDirs=/home/kafka/flumefilechannel/dataDirs agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {code} Exceptions like this: {code:xml} 17/06/09 17:16:18 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for FileBackedTransaction, capacity 5 full, consider committing more frequently, increasing capacity, or increasing thread count. [channel=ch2] at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelException: Take list for FileBackedTransaction, capacity 5 full, consider committing more frequently, in creasing capacity, or increasing thread count. [channel=ch2] at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:531) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) ... 3 more {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data
[ https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044177#comment-16044177 ] Yongxi Zhang commented on FLUME-3106: - I think when takeList is full, memoryChannel should return null,not throw a exception. > When batchSize of sink greater than transactionCapacity of Memory Channel, > Flume can produce endless data > - > > Key: FLUME-3106 > URL: https://issues.apache.org/jira/browse/FLUME-3106 > Project: Flume > Issue Type: Bug > Components: Channel >Affects Versions: 1.7.0 >Reporter: Yongxi Zhang > Fix For: 1.8.0 > > Attachments: FLUME-3106-0.patch > > > Flume can produce endless data when use this following config: > {code:xml} > agent.sources = src1 > agent.sinks = sink1 > agent.channels = ch2 > agent.sources.src1.type = spooldir > agent.sources.src1.channels = ch2 > agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir > agent.sources.src1.fileHeader = false > agent.sources.src1.batchSize = 5 > agent.channels.ch2.type=memory > agent.channels.ch2.capacity=100 > agent.channels.ch2.transactionCapacity=5 > agent.sinks.sink1.type = hdfs > agent.sinks.sink1.channel = ch2 > agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ > agent.sinks.sink1.hdfs.rollInterval=1 > agent.sinks.sink1.hdfs.fileType = DataStream > agent.sinks.sink1.hdfs.writeFormat = Text > agent.sinks.sink1.hdfs.batchSize = 10 > {code} > And there are Exceptions like this: > {code:xml} > org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity > 5 full, consider committing more frequently, increasing capaci > ty, or increasing thread count > at > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) > at > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > at > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception > follows. > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: > Take list for MemoryTransaction, capacity 5 full, consider comm > itting more frequently, increasing capacity, or increasing thread count > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > {code} > When takeList of Memory Channel is full,there is a ChannelException will be > throwed,The event of takeList has been writed by the sink and roll back to > the queue of memoryChannel at the same time,it is not reasonable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data
[ https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yongxi Zhang updated FLUME-3106: Description: Flume can produce endless data when use this following config: {code:xml} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=memory agent.channels.ch2.capacity=100 agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {code} And there are Exceptions like this: {code:xml} org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider committing more frequently, increasing capaci ty, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider comm itting more frequently, increasing capacity, or increasing thread count at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) {code} When takeList of Memory Channel is full,there is a ChannelException will be throwed,The event of takeList has been writed by the sink and roll back to the queue of memoryChannel at the same time,it is not reasonable. was: Flume can produce endless data when use this following config: {code:xml} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=memory agent.channels.ch2.capacity=100 agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {code} And there are Exceptions like this: {panel} org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider committing more frequently, increasing capaci ty, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider comm itting more frequently, increasing capacity, or increasing thread count at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) {panel} When takeList of Memory Channel is full,there is a ChannelException will be throwed,The event of takeList has been writed by the sink and roll back to the queue of memoryChannel at the same time,it is not reasonable. > When batchSize
[jira] [Updated] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data
[ https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yongxi Zhang updated FLUME-3106: Description: Flume can produce endless data when use this following config: {code:xml} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=memory agent.channels.ch2.capacity=100 agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {code} And there are Exceptions like this: {panel} org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider committing more frequently, increasing capaci ty, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider comm itting more frequently, increasing capacity, or increasing thread count at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) {panel} When takeList of Memory Channel is full,there is a ChannelException will be throwed,The event of takeList has been writed by the sink and roll back to the queue of memoryChannel at the same time,it is not reasonable. was: Flume can produce endless data when use this following config: {panel} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=memory agent.channels.ch2.capacity=100 agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {panel} And there are Exceptions like this: {panel} org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider committing more frequently, increasing capaci ty, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider comm itting more frequently, increasing capacity, or increasing thread count at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) {panel} When takeList of Memory Channel is full,there is a ChannelException will be throwed,The event of takeList has been writed by the sink and roll back to the queue of memoryChannel at the same time,it is not reasonable. > When batchSize of
[jira] [Updated] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data
[ https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yongxi Zhang updated FLUME-3106: Attachment: FLUME-3106-0.patch > When batchSize of sink greater than transactionCapacity of Memory Channel, > Flume can produce endless data > - > > Key: FLUME-3106 > URL: https://issues.apache.org/jira/browse/FLUME-3106 > Project: Flume > Issue Type: Bug > Components: Channel >Affects Versions: 1.7.0 >Reporter: Yongxi Zhang > Fix For: 1.8.0 > > Attachments: FLUME-3106-0.patch > > > Flume can produce endless data when use this following config: > {panel} > agent.sources = src1 > agent.sinks = sink1 > agent.channels = ch2 > agent.sources.src1.type = spooldir > agent.sources.src1.channels = ch2 > agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir > agent.sources.src1.fileHeader = false > agent.sources.src1.batchSize = 5 > agent.channels.ch2.type=memory > agent.channels.ch2.capacity=100 > agent.channels.ch2.transactionCapacity=5 > agent.sinks.sink1.type = hdfs > agent.sinks.sink1.channel = ch2 > agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ > agent.sinks.sink1.hdfs.rollInterval=1 > agent.sinks.sink1.hdfs.fileType = DataStream > agent.sinks.sink1.hdfs.writeFormat = Text > agent.sinks.sink1.hdfs.batchSize = 10 > {panel} > And there are Exceptions like this: > {panel} > org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity > 5 full, consider committing more frequently, increasing capaci > ty, or increasing thread count > at > org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) > at > org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) > at > org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception > follows. > org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: > Take list for MemoryTransaction, capacity 5 full, consider comm > itting more frequently, increasing capacity, or increasing thread count > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > {panel} > When takeList of Memory Channel is full,there is a ChannelException will be > throwed,The event of takeList has been writed by the sink and roll back to > the queue of memoryChannel at the same time,it is not reasonable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data
Yongxi Zhang created FLUME-3106: --- Summary: When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data Key: FLUME-3106 URL: https://issues.apache.org/jira/browse/FLUME-3106 Project: Flume Issue Type: Bug Components: Channel Affects Versions: 1.7.0 Reporter: Yongxi Zhang Fix For: 1.8.0 Flume can produce endless data when use this following config: {panel} agent.sources = src1 agent.sinks = sink1 agent.channels = ch2 agent.sources.src1.type = spooldir agent.sources.src1.channels = ch2 agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir agent.sources.src1.fileHeader = false agent.sources.src1.batchSize = 5 agent.channels.ch2.type=memory agent.channels.ch2.capacity=100 agent.channels.ch2.transactionCapacity=5 agent.sinks.sink1.type = hdfs agent.sinks.sink1.channel = ch2 agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/ agent.sinks.sink1.hdfs.rollInterval=1 agent.sinks.sink1.hdfs.fileType = DataStream agent.sinks.sink1.hdfs.writeFormat = Text agent.sinks.sink1.hdfs.batchSize = 10 {panel} And there are Exceptions like this: {panel} org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider committing more frequently, increasing capaci ty, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 full, consider comm itting more frequently, increasing capacity, or increasing thread count at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) {panel} When takeList of Memory Channel is full,there is a ChannelException will be throwed,The event of takeList has been writed by the sink and roll back to the queue of memoryChannel at the same time,it is not reasonable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLUME-3070) The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once
[ https://issues.apache.org/jira/browse/FLUME-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025737#comment-16025737 ] Yongxi Zhang commented on FLUME-3070: - [~mpercy][~bessbd][~hshreedharan] Anybody available for review? > The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only > be replaced once > > > Key: FLUME-3070 > URL: https://issues.apache.org/jira/browse/FLUME-3070 > Project: Flume > Issue Type: Improvement >Affects Versions: 1.7.0 >Reporter: Yongxi Zhang > Attachments: FLUME-3070-0.patch > > > The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only > be replaced once, which is good for Hdfs Sink to process data efficiently. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLUME-3089) flume hive sink
[ https://issues.apache.org/jira/browse/FLUME-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mengge zhang updated FLUME-3089: Description: When flume is running up,the background is being given: 2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.IllegalArgumentException at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException at java.sql.Date.valueOf(Date.java:143) at org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:310) at org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218) at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174) at org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:156) at org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:123) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.writeImpl(HiveEndPoint.java:815) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:782) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735) at org.apache.flume.sink.hive.HiveJsonSerializer.write(HiveJsonSerializer.java:42) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152) at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more I do not know why,please help me,thanks. was: When flume is running up,the background is being given: 2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.IllegalArgumentException at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException at java.sql.Date.valueOf(Date.java:143) at org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:310) at org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218) at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174) at org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:156) at org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:123) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.writeImpl(HiveEndPoint.java:815) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:782) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735) at org.apache.flume.sink.hive.HiveJsonSerializer.write(HiveJsonSerializer.java:42) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152) at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more > flume hive sink > > > Key: FLUME-3089 > URL: https://issues.apache.org/jira/browse/FLUME-3089 > Project: Flume > Issue Type: Bug > Environment: system:Ubuntu 16.04.1 LTS \n \l > flume:apache-flume-1.7.0-bin.tar.gz > hive:apache-hive-2.1.1-bin >Reporter: mengge zhang > > When flume is running up,the background is being given: > 2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) > [ERROR -
[jira] [Created] (FLUME-3089) flume hive sink
mengge zhang created FLUME-3089: --- Summary: flume hive sink Key: FLUME-3089 URL: https://issues.apache.org/jira/browse/FLUME-3089 Project: Flume Issue Type: Bug Environment: system:Ubuntu 16.04.1 LTS \n \l flume:apache-flume-1.7.0-bin.tar.gz hive:apache-hive-2.1.1-bin Reporter: mengge zhang When flume is running up,the background is being given: 2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: java.lang.IllegalArgumentException at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException at java.sql.Date.valueOf(Date.java:143) at org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:310) at org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218) at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174) at org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:156) at org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:123) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.writeImpl(HiveEndPoint.java:815) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:782) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735) at org.apache.flume.sink.hive.HiveJsonSerializer.write(HiveJsonSerializer.java:42) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158) at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152) at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLUME-3070) The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once
[ https://issues.apache.org/jira/browse/FLUME-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yongxi Zhang updated FLUME-3070: Attachment: FLUME-3070-0.patch > The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only > be replaced once > > > Key: FLUME-3070 > URL: https://issues.apache.org/jira/browse/FLUME-3070 > Project: Flume > Issue Type: Improvement >Affects Versions: v1.7.0 >Reporter: Yongxi Zhang > Attachments: FLUME-3070-0.patch > > > The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only > be replaced once, which is good for Hdfs Sink to process data efficiently. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLUME-3070) The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once
Yongxi Zhang created FLUME-3070: --- Summary: The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once Key: FLUME-3070 URL: https://issues.apache.org/jira/browse/FLUME-3070 Project: Flume Issue Type: Improvement Affects Versions: v1.7.0 Reporter: Yongxi Zhang The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once, which is good for Hdfs Sink to process data efficiently. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLUME-2915) The kafka channel using new APIs will be stuck when the sink is avro sink
Julian Zhang created FLUME-2915: --- Summary: The kafka channel using new APIs will be stuck when the sink is avro sink Key: FLUME-2915 URL: https://issues.apache.org/jira/browse/FLUME-2915 Project: Flume Issue Type: Bug Components: Channel Affects Versions: v1.7.0 Reporter: Julian Zhang The avro sink was stuck when I using the kafka channel which using the new APIs. After couple of hours I found the issue at KafkaChannel.java#L384: e.getHeaders().put(KEY_HEADER, record.key()); and change it to: if (record.key() != null) { e.getHeaders().put(KEY_HEADER, record.key()); } The reason is: record.key() could be null if the user didn't set it. And the avro serialize the event will throw a NullPointerException. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLUME-2653) Allow inUseSuffix to be null/empty
[ https://issues.apache.org/jira/browse/FLUME-2653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958778#comment-14958778 ] Ke Zhang commented on FLUME-2653: - I got this error message when i was trying to update flume settings via cloudera manager: {quote}An error occurred while validating this configuration: Component tsv_agent.sinks.hdfs-sink-view.hdfs.inUseSuffix: Property value missing.{quote} As workaround I extended the HDFSEventSink like below: {code:title=MyHDFSEventSink.java|borderStyle=solid} public static final String IN_USE_SUFFIX_ENABLED_KEY = "hdfs.inUseSuffixEnabled"; public static final String IN_USE_SUFFIX_KEY = "hdfs.inUseSuffix"; @Override public void configure(Context context) { boolean inUseSuffixEnabled = context.getBoolean(IN_USE_SUFFIX_ENABLED_KEY, true); if (!inUseSuffixEnabled) { context.put(IN_USE_SUFFIX_KEY, ""); } super.configure(context); } {code} and updated flume sink type to: {code:title=flume-conf.properties} my_agent.sinks.my_hdfs_sink.type = com.mycompany.MyHDFSEventSink my_agent.sinks.my_hdfs_sink.hdfs.inUseSuffixEnabled = false # ... {code} > Allow inUseSuffix to be null/empty > -- > > Key: FLUME-2653 > URL: https://issues.apache.org/jira/browse/FLUME-2653 > Project: Flume > Issue Type: Improvement > Components: Sinks+Sources >Affects Versions: v1.5.1 >Reporter: Andrew Jones > Labels: hdfssink > > At the moment, it doesn't seem possible to set the null/empty. We've tried > {{''}} which just adds the quotes to the end, and setting to nothing, which > just uses the default {{.tmp}}. > We want the _in use_ file to have the same name as the _closed_ file, so we > can read from files that are in use without the file moving from underneath > us. In our use case, we know that an in use file is still readable and > parseable, because it is just text with a JSON document per line. > It looks like [the HDFS sink > code|https://github.com/apache/flume/blob/542b1695033d330eb00ae81713fdc838b88332b6/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java#L618] > can handle this change already, but at the moment there is no way to set the > {{bucketPath}} and {{targetPath}} to be the same. -- This message was sent by Atlassian JIRA (v6.3.4#6332)