[jira] [Created] (FLUME-3190) flume shutdown hook issue when both hbase and hdfs sink are in use

2017-11-01 Thread Yuexin Zhang (JIRA)
Yuexin Zhang created FLUME-3190:
---

 Summary: flume shutdown hook issue when both hbase and hdfs sink 
are in use
 Key: FLUME-3190
 URL: https://issues.apache.org/jira/browse/FLUME-3190
 Project: Flume
  Issue Type: Bug
Affects Versions: 1.6.0
Reporter: Yuexin Zhang
Priority: Major


When both hdfs and hbase sink are in use, during shutdown (KILL SIGTERM), the 
hdfs sink won't be able to rename/close the .tmp hdfs file because the 
underlying filesystem could be closed earlier when shutting down the other 
component:

{code:java}
2017/10/23 15:34:50,858 ERROR (AbstractHDFSWriter.hflushOrSync:268) - Error 
while trying to hflushOrSync!
2017/10/23 15:34:50,859 WARN (BucketWriter.close:400) - failed to close() 
HDFSWriter for file (/tmp/bothSource/FlumeData.1508744083526.tmp). Exception 
follows.
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:860)
at 
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2388)
at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:2334)
at 
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flume.sink.hdfs.AbstractHDFSWriter.hflushOrSync(AbstractHDFSWriter.java:265)
at 
org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:134)
at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:327)
at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:323)
at 
org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:701)
at 
org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:698)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

the root cause is HBase client's DynamicClassLoader injection (See 
DynamicClassLoader.java in HBase). HBase added a feature at some point to load 
JARs from HDFS dynamically into its class loader, and to do this it loads a 
DistributedFileSystem object via the standard FileSystem.get(…) / equivalent 
call.
Flume, OTOH, in its HDFS BucketWriter, uses FileSystem.get(…) too (all a single 
instance, coming from the cache), but supplies an instruction that disables 
automatic-close at shutdown (Look for fs.automatic.close in BucketWriter.java).
When HBase sink is active, HBase shares the FileSystem object indirectly for 
its internal/implicit DynamicClassLoader object, but this is grabbed from the 
cache without specifying 'do not auto-close at shutdown' cause HBase is not 
really troubled by that. However, since the same FileSystem object instance is 
now shared by something that wants it to auto-close and something that does 
not, the shutdown causes a problem in Flume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-10-30 Thread Will Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225009#comment-16225009
 ] 

Will Zhang commented on FLUME-3149:
---

Hi [~bessbd],
Thanks for your comments. I've made a few changes and comments accordingly.
Hopefully it can be reviewed by more. I believe it's a valuable improvement.

Best,
Yifeng

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>Assignee: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-09-25 Thread Will Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16180281#comment-16180281
 ] 

Will Zhang commented on FLUME-3149:
---

Hi [~bessbd],
I'll update the user guide and hopefully any one can review this PR.
Thank you.

Best,
Will

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>Assignee: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-09-23 Thread Will Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177665#comment-16177665
 ] 

Will Zhang commented on FLUME-3149:
---

Hi [~bessbd],
It's been a while since I made the PR, so I wonder whether you or anyone 
possibly got any time to review this patch?
Thank you very much.

Best,
Will

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>Assignee: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-09-12 Thread Will Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Will Zhang reassigned FLUME-3149:
-

Assignee: Will Zhang

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>Assignee: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-09-11 Thread Will Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16161203#comment-16161203
 ] 

Will Zhang commented on FLUME-3149:
---

Hi [~bessbd],

I've appended a commit. Please review.
Also, it seems I need permission to assign the issue to myself so that every 
commit can be show here?
Thank you.

Best,
Will

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-09-08 Thread Will Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158975#comment-16158975
 ] 

Will Zhang edited comment on FLUME-3149 at 9/8/17 5:29 PM:
---

Hi [~bessbd],

I didn't quite get how the "persisted" flag could be implemented. In this file 
transfer scenario, the source won't know about how to "safely" committed unless 
the channel told the source at the exact moment when the events are taken and 
committed from the channel. In my understanding, the event is only "safely 
commited" when sink consumes it. And as far as I know, a callback to the source 
is necessary in the commit() method of channel transaction. 

So as you mentioned, we could implement a new event.

I came up with an idea that we can create a new Event class like 
"NotificationEvent" which extends the SimpleEvent and contains a callback 
function. Generally, in the commit method of the memory channel, it will check 
whether the takeList is not null and whether the event is Instance of 
NotificationEvent.class. If so, it will call the callback function of every 
event which is defined in the event and going to use the header info to update 
pos info in Taildir. At last of the commit method, it will call perhaps another 
callback method in the event which is just going to write the updated pos info 
in taildir to disk. 

This way, changes are mainly on the taildir side and changes on the channel 
side is small which only adds a branch of checking takeList and call back in 
the commit method. I had a simple test and it works but I'm not sure whether it 
meets the requirement of "separate the channel and source".

What do you think?
Thank you.



was (Author: zyfo2):
Hi [~bessbd],

I didn't quite get how the "persisted" flag could be implemented. In this file 
transfer scenario, the source won't know about how to "safely" committed unless 
the channel told the source at the exact moment when the events are taken and 
committed from the channel. In my understanding, the event is only "safely 
commited" when sink consumes it. And as far as I know, a callback to the source 
is necessary in the commit() method of channel transaction. 

So as you mentioned, we could implement a new event.

I came up with an idea that we can create a new Event class like 
"NotificationEvent" which extends the SimpleEvent and contains a callback 
function. Generally, in the commit method of the memory channel, it will check 
whether the takeList is not null and whether the event is Instance of 
NotificationEvent.class. If so, it will call the callback function of every 
event which is defined in the event and going to use the header info to update 
pos info in Taildir. At last of the commit method, it will call perhaps another 
callback method in the event which is just going to write the updated pos info 
to disk. 

This way, changes are mainly on the taildir side and changes on the channel 
side is small which only adds a branch of checking takeList and call back in 
the commit method. I had a simple test and it works but I'm not sure whether it 
meets the requirement of "separate the channel and source".

What do you think?
Thank you.


> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-09-08 Thread Will Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158975#comment-16158975
 ] 

Will Zhang commented on FLUME-3149:
---

Hi [~bessbd],

I didn't quite get how the "persisted" flag could be implemented. In this file 
transfer scenario, the source won't know about how to "safely" committed unless 
the channel told the source at the exact moment when the events are taken and 
committed from the channel. In my understanding, the event is only "safely 
commited" when sink consumes it. And as far as I know, a callback to the source 
is necessary in the commit() method of channel transaction. 

So as you mentioned, we could implement a new event.

I came up with an idea that we can create a new Event class like 
"NotificationEvent" which extends the SimpleEvent and contains a callback 
function. Generally, in the commit method of the memory channel, it will check 
whether the takeList is not null and whether the event is Instance of 
NotificationEvent.class. If so, it will call the callback function of every 
event which is defined in the event and going to use the header info to update 
pos info in Taildir. At last of the commit method, it will call perhaps another 
callback method in the event which is just going to write the updated pos info 
to disk. 

This way, changes are mainly on the taildir side and changes on the channel 
side is small which only adds a branch of checking takeList and call back in 
the commit method. I had a simple test and it works but I'm not sure whether it 
meets the requirement of "separate the channel and source".

What do you think?
Thank you.


> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-09-03 Thread Will Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16151773#comment-16151773
 ] 

Will Zhang commented on FLUME-3149:
---

Hi [~fszabo],
A PollableSourceChannel sounds like a good idea but I got some confusions about 
it.
Do you mean a channel which implements both the PollableSource and 
BasicChannelSemantics interfaces?
If so, how does the PollableSource process() method work? For now, it puts 
events into the channel. As you mentioned, this PollableSourceChannel has no 
buffer so I suppose it doesn't need a put method and only reads events and 
keeps track of them when taken in a transaction?

And [~bessbd], could you clarify the idea about "a new Event implementation 
which has a "persisted" flag or callback"? I suppose a event with callback is 
something you can callback to the source when events are consumed and committed 
in the channel so that we can have a actual position record. If so, this seems 
to be a clean way to go.
Also, what does the "persisted" flag mean?

Please correct me if I got anything wrong.
Thank you.

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: Will Zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-29 Thread will zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144789#comment-16144789
 ] 

will zhang commented on FLUME-3149:
---

Hi [~bessbd],
Thank you for your clarification.
Right now, I can't think of a way to separate them cause the key of my idea is 
to let the channel inform the source when to actually commit. However, the 
current design of flume to decouple them.
I can move the channel's part of "writePos" to source side but the source still 
needs to know when the channel commits.
I'll keep working on it and update if any progress made.
Thank you.

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-26 Thread will zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16142822#comment-16142822
 ] 

will zhang commented on FLUME-3149:
---

Hi [~fszabo],
1. Some of our use cases don't include kafka, some may only use hdfs sink,  so 
kafka channel is not an option for general purpose.

2. I tried increase batch size (from 500 to 200,000) and use buffered output 
stream in file channel at the first place, however, it helped a little but not 
good enough still due to system calls like seek/read/write, which I analyzed by 
visualvm. For 1 Mb/s file transfer using file channel, the cpu cost is still 
more than 10% overall (before increasing batch size, it's about 13%), while 
using memory channel cost only about 3% at the same transfer rate.

3. IMO, file transfer is a very common use case, however, none of the channels 
is suitable enough to achieve both high performance and low cost. And 
reliability is often of critical importance in production. So file channel seem 
to be the only option. But we actually don't really need to store the events 
again in files since they come from local files originally. So maybe a separate 
channel specifically for file transfer is a better choice? 

Please correct me if I got something wrong. Thank you.

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-26 Thread will zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

will zhang updated FLUME-3149:
--
Comment: was deleted

(was: Btw, just to mention it, for sinks like hdfs when using taildir, we can 
pass offsets without actual message body to sink side to take advantage of 
zeroCopy to further reduce CPU cost, which is probably another issue.)

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-26 Thread will zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16142711#comment-16142711
 ] 

will zhang commented on FLUME-3149:
---

Btw, just to mention it, for sinks like hdfs when using taildir, we can pass 
offsets without actual message body to sink side to take advantage of zeroCopy 
to further reduce CPU cost, which is probably another issue.

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-26 Thread will zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16142706#comment-16142706
 ] 

will zhang commented on FLUME-3149:
---

Hi [~bessbd], 
I have made a PR. 
Will you kindly review it? 
Thank you.

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-26 Thread will zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

will zhang updated FLUME-3149:
--
Comment: was deleted

(was: pr)

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-22 Thread will zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137790#comment-16137790
 ] 

will zhang commented on FLUME-3149:
---

Hi [~bessbd],
Thanks for the reply. I'll patch it soon.

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-22 Thread will zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136600#comment-16136600
 ] 

will zhang commented on FLUME-3149:
---

Can anyone please help review this issue? Thanks.

> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-17 Thread will zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

will zhang updated FLUME-3149:
--
Description: 
File channel tracks transferred events and use transnational mechanism to make 
transfer recoverable. However, it increases CPU cost due to frequent system 
calls like write, read, etc. The Cpu cost could be very high if the transfer 
rate is high. In contrast, Memory channel  has no such issue which requires 
only about 10% of CPU cost  in the same environment but it's not recovered if 
the system is down accidentally.
For sources like taildir/spooldir, I propose we could track offsets of file and 
store them locally to achieve reliability while still using memory channel to 
reduce CPU cost. Actually, I have already implemented this feature by storing 
the offsets in event headers and passing it to my own "offsetMemoryChannel" and 
store theses offsets in local disk in our production which reduces CPU cost by 
about 90 percent.
Please let me know if it's worthwhile to have this feature in community 
version. Thank you.

  was:
File channel tracks transferred events and use transnational mechanism to make 
transfer recoverable. However, it increases CPU cost due to frequent system 
calls like write, read, etc. The Cpu cost could be very high if the transfer 
rate is high. In contrast, Memory channel  has no such issue which requires 
only about 10% of CPU cost  in the same environment but it's not recovered if 
the system is down accidentally.
For sources like taildir/spooldir, I propose we could track offsets of file and 
store them locally to achieve reliability while still using memory channel to 
reduce CPU cost. Actually, I have already implemented this feature in our 
production which reduces CPU cost by about 90 percent.
Please let me know if it's worthwhile to have this feature in community. Thank 
you.


> reduce cpu cost for file source transfer while still maintaining reliability
> 
>
> Key: FLUME-3149
> URL: https://issues.apache.org/jira/browse/FLUME-3149
> Project: Flume
>  Issue Type: Improvement
>  Components: File Channel
>Reporter: will zhang
>
> File channel tracks transferred events and use transnational mechanism to 
> make transfer recoverable. However, it increases CPU cost due to frequent 
> system calls like write, read, etc. The Cpu cost could be very high if the 
> transfer rate is high. In contrast, Memory channel  has no such issue which 
> requires only about 10% of CPU cost  in the same environment but it's not 
> recovered if the system is down accidentally.
> For sources like taildir/spooldir, I propose we could track offsets of file 
> and store them locally to achieve reliability while still using memory 
> channel to reduce CPU cost. Actually, I have already implemented this feature 
> by storing the offsets in event headers and passing it to my own 
> "offsetMemoryChannel" and store theses offsets in local disk in our 
> production which reduces CPU cost by about 90 percent.
> Please let me know if it's worthwhile to have this feature in community 
> version. Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLUME-3149) reduce cpu cost for file source transfer while still maintaining reliability

2017-08-17 Thread will zhang (JIRA)
will zhang created FLUME-3149:
-

 Summary: reduce cpu cost for file source transfer while still 
maintaining reliability
 Key: FLUME-3149
 URL: https://issues.apache.org/jira/browse/FLUME-3149
 Project: Flume
  Issue Type: Improvement
  Components: File Channel
Reporter: will zhang


File channel tracks transferred events and use transnational mechanism to make 
transfer recoverable. However, it increases CPU cost due to frequent system 
calls like write, read, etc. The Cpu cost could be very high if the transfer 
rate is high. In contrast, Memory channel  has no such issue which requires 
only about 10% of CPU cost  in the same environment but it's not recovered if 
the system is down accidentally.
For sources like taildir/spooldir, I propose we could track offsets of file and 
store them locally to achieve reliability while still using memory channel to 
reduce CPU cost. Actually, I have already implemented this feature in our 
production which reduces CPU cost by about 90 percent.
Please let me know if it's worthwhile to have this feature in community. Thank 
you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data

2017-06-20 Thread Yongxi Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16055351#comment-16055351
 ] 

Yongxi Zhang commented on FLUME-3106:
-

[~niklaus.xiao],hi,niklaus,can you have a detailed explain for me?I have 
thought for a long time, but I still don't understand it.I   think it is 
reasonable to ignores the exception and  rerun null when takeList of memory 
channel is full.

> When batchSize of sink greater than transactionCapacity of Memory Channel, 
> Flume can produce endless data
> -
>
> Key: FLUME-3106
> URL: https://issues.apache.org/jira/browse/FLUME-3106
> Project: Flume
>  Issue Type: Bug
>  Components: Channel
>Affects Versions: 1.7.0
>Reporter: Yongxi Zhang
> Fix For: 1.8.0
>
> Attachments: FLUME-3106-0.patch
>
>
> Flume can produce endless data when use this following config:
> {code:xml}
> agent.sources = src1
> agent.sinks = sink1
> agent.channels = ch2
> agent.sources.src1.type = spooldir
> agent.sources.src1.channels = ch2
> agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
> agent.sources.src1.fileHeader = false
> agent.sources.src1.batchSize = 5
> agent.channels.ch2.type=memory
> agent.channels.ch2.capacity=100
> agent.channels.ch2.transactionCapacity=5
> agent.sinks.sink1.type = hdfs
> agent.sinks.sink1.channel = ch2
> agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
> agent.sinks.sink1.hdfs.rollInterval=1
> agent.sinks.sink1.hdfs.fileType = DataStream
> agent.sinks.sink1.hdfs.writeFormat = Text
> agent.sinks.sink1.hdfs.batchSize = 10
> {code}
> And there are Exceptions like this:
> {code:xml}
> org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 
> 5 full, consider committing more frequently, increasing capaci
> ty, or increasing thread count
> at 
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
> at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
> at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
> follows.
> org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
> Take list for MemoryTransaction, capacity 5 full, consider comm
> itting more frequently, increasing capacity, or increasing thread count
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
> at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> When takeList of Memory Channel is full,there is a ChannelException will be 
> throwed,The event of takeList has been writed by the sink and roll back to 
> the queue of memoryChannel at the same time,it is not reasonable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLUME-3107) When batchSize of sink greater than transactionCapacity of File Channel, Flume can produce endless data

2017-06-09 Thread Yongxi Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongxi Zhang updated FLUME-3107:

Attachment: FLUME-3107-0.patch

> When batchSize of sink greater than transactionCapacity of File Channel, 
> Flume can produce endless data
> ---
>
> Key: FLUME-3107
> URL: https://issues.apache.org/jira/browse/FLUME-3107
> Project: Flume
>  Issue Type: Bug
>  Components: File Channel
>Affects Versions: 1.7.0
>Reporter: Yongxi Zhang
> Fix For: 1.8.0
>
> Attachments: FLUME-3107-0.patch
>
>
> This problem is the similar as it in FLUME-3106.Flume can produce endless 
> data When batchSize of sink greater than transactionCapacity of File Channel, 
> you can try it with the following config:
> {code:xml}
> agent.sources = src1
> agent.sinks = sink1
> agent.channels = ch2
> agent.sources.src1.type = spooldir
> agent.sources.src1.channels = ch2
> agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
> agent.sources.src1.fileHeader = false
> agent.sources.src1.batchSize = 5
> agent.channels.ch2.type=file
> agent.channels.ch2.capacity=100
> agent.channels.ch2.checkpointDir=/home/kafka/flumefilechannel/checkpointDir
> agent.channels.ch2.dataDirs=/home/kafka/flumefilechannel/dataDirs
> agent.channels.ch2.transactionCapacity=5
> agent.sinks.sink1.type = hdfs
> agent.sinks.sink1.channel = ch2
> agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
> agent.sinks.sink1.hdfs.rollInterval=1
> agent.sinks.sink1.hdfs.fileType = DataStream
> agent.sinks.sink1.hdfs.writeFormat = Text
> agent.sinks.sink1.hdfs.batchSize = 10
> {code}
> Exceptions like this:
> {code:xml}
> 17/06/09 17:16:18 ERROR flume.SinkRunner: Unable to deliver event. Exception 
> follows.
> org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
> Take list for FileBackedTransaction, capacity 5 full, consider
> committing more frequently, increasing capacity, or increasing thread count. 
> [channel=ch2]
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
> at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Take list for 
> FileBackedTransaction, capacity 5 full, consider committing more frequently, 
> in
> creasing capacity, or increasing thread count. [channel=ch2]
> at 
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:531)
> at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
> ... 3 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLUME-3107) When batchSize of sink greater than transactionCapacity of File Channel, Flume can produce endless data

2017-06-09 Thread Yongxi Zhang (JIRA)
Yongxi Zhang created FLUME-3107:
---

 Summary: When batchSize of sink greater than transactionCapacity 
of File Channel, Flume can produce endless data
 Key: FLUME-3107
 URL: https://issues.apache.org/jira/browse/FLUME-3107
 Project: Flume
  Issue Type: Bug
  Components: File Channel
Affects Versions: 1.7.0
Reporter: Yongxi Zhang
 Fix For: 1.8.0


This problem is the similar as it in FLUME-3106.Flume can produce endless data 
When batchSize of sink greater than transactionCapacity of File Channel, you 
can try it with the following config:
{code:xml}
agent.sources = src1
agent.sinks = sink1
agent.channels = ch2

agent.sources.src1.type = spooldir
agent.sources.src1.channels = ch2
agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
agent.sources.src1.fileHeader = false
agent.sources.src1.batchSize = 5

agent.channels.ch2.type=file
agent.channels.ch2.capacity=100
agent.channels.ch2.checkpointDir=/home/kafka/flumefilechannel/checkpointDir
agent.channels.ch2.dataDirs=/home/kafka/flumefilechannel/dataDirs
agent.channels.ch2.transactionCapacity=5

agent.sinks.sink1.type = hdfs
agent.sinks.sink1.channel = ch2
agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
agent.sinks.sink1.hdfs.rollInterval=1
agent.sinks.sink1.hdfs.fileType = DataStream
agent.sinks.sink1.hdfs.writeFormat = Text
agent.sinks.sink1.hdfs.batchSize = 10
{code}
Exceptions like this:
{code:xml}
17/06/09 17:16:18 ERROR flume.SinkRunner: Unable to deliver event. Exception 
follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Take list for FileBackedTransaction, capacity 5 full, consider
committing more frequently, increasing capacity, or increasing thread count. 
[channel=ch2]
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for 
FileBackedTransaction, capacity 5 full, consider committing more frequently, in
creasing capacity, or increasing thread count. [channel=ch2]
at 
org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:531)
at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
... 3 more
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data

2017-06-09 Thread Yongxi Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044177#comment-16044177
 ] 

Yongxi Zhang commented on FLUME-3106:
-

I think when takeList is full, memoryChannel should return null,not throw a 
exception.

> When batchSize of sink greater than transactionCapacity of Memory Channel, 
> Flume can produce endless data
> -
>
> Key: FLUME-3106
> URL: https://issues.apache.org/jira/browse/FLUME-3106
> Project: Flume
>  Issue Type: Bug
>  Components: Channel
>Affects Versions: 1.7.0
>Reporter: Yongxi Zhang
> Fix For: 1.8.0
>
> Attachments: FLUME-3106-0.patch
>
>
> Flume can produce endless data when use this following config:
> {code:xml}
> agent.sources = src1
> agent.sinks = sink1
> agent.channels = ch2
> agent.sources.src1.type = spooldir
> agent.sources.src1.channels = ch2
> agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
> agent.sources.src1.fileHeader = false
> agent.sources.src1.batchSize = 5
> agent.channels.ch2.type=memory
> agent.channels.ch2.capacity=100
> agent.channels.ch2.transactionCapacity=5
> agent.sinks.sink1.type = hdfs
> agent.sinks.sink1.channel = ch2
> agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
> agent.sinks.sink1.hdfs.rollInterval=1
> agent.sinks.sink1.hdfs.fileType = DataStream
> agent.sinks.sink1.hdfs.writeFormat = Text
> agent.sinks.sink1.hdfs.batchSize = 10
> {code}
> And there are Exceptions like this:
> {code:xml}
> org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 
> 5 full, consider committing more frequently, increasing capaci
> ty, or increasing thread count
> at 
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
> at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
> at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
> follows.
> org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
> Take list for MemoryTransaction, capacity 5 full, consider comm
> itting more frequently, increasing capacity, or increasing thread count
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
> at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> When takeList of Memory Channel is full,there is a ChannelException will be 
> throwed,The event of takeList has been writed by the sink and roll back to 
> the queue of memoryChannel at the same time,it is not reasonable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data

2017-06-09 Thread Yongxi Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongxi Zhang updated FLUME-3106:

Description: 
Flume can produce endless data when use this following config:
{code:xml}
agent.sources = src1
agent.sinks = sink1
agent.channels = ch2

agent.sources.src1.type = spooldir
agent.sources.src1.channels = ch2
agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
agent.sources.src1.fileHeader = false
agent.sources.src1.batchSize = 5

agent.channels.ch2.type=memory
agent.channels.ch2.capacity=100
agent.channels.ch2.transactionCapacity=5

agent.sinks.sink1.type = hdfs
agent.sinks.sink1.channel = ch2
agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
agent.sinks.sink1.hdfs.rollInterval=1
agent.sinks.sink1.hdfs.fileType = DataStream
agent.sinks.sink1.hdfs.writeFormat = Text
agent.sinks.sink1.hdfs.batchSize = 10
{code}
And there are Exceptions like this:
{code:xml}
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 
full, consider committing more frequently, increasing capaci
ty, or increasing thread count
at 
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Take list for MemoryTransaction, capacity 5 full, consider comm
itting more frequently, increasing capacity, or increasing thread count
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
{code}

When takeList of Memory Channel is full,there is a ChannelException will be 
throwed,The event of takeList has been writed by the sink and roll back to the 
queue of memoryChannel at the same time,it is not reasonable.

  was:
Flume can produce endless data when use this following config:
{code:xml}
agent.sources = src1
agent.sinks = sink1
agent.channels = ch2

agent.sources.src1.type = spooldir
agent.sources.src1.channels = ch2
agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
agent.sources.src1.fileHeader = false
agent.sources.src1.batchSize = 5

agent.channels.ch2.type=memory
agent.channels.ch2.capacity=100
agent.channels.ch2.transactionCapacity=5

agent.sinks.sink1.type = hdfs
agent.sinks.sink1.channel = ch2
agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
agent.sinks.sink1.hdfs.rollInterval=1
agent.sinks.sink1.hdfs.fileType = DataStream
agent.sinks.sink1.hdfs.writeFormat = Text
agent.sinks.sink1.hdfs.batchSize = 10
{code}
And there are Exceptions like this:
{panel}
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 
full, consider committing more frequently, increasing capaci
ty, or increasing thread count
at 
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Take list for MemoryTransaction, capacity 5 full, consider comm
itting more frequently, increasing capacity, or increasing thread count
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
{panel}

When takeList of Memory Channel is full,there is a ChannelException will be 
throwed,The event of takeList has been writed by the sink and roll back to the 
queue of memoryChannel at the same time,it is not reasonable.


> When batchSize 

[jira] [Updated] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data

2017-06-09 Thread Yongxi Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongxi Zhang updated FLUME-3106:

Description: 
Flume can produce endless data when use this following config:
{code:xml}
agent.sources = src1
agent.sinks = sink1
agent.channels = ch2

agent.sources.src1.type = spooldir
agent.sources.src1.channels = ch2
agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
agent.sources.src1.fileHeader = false
agent.sources.src1.batchSize = 5

agent.channels.ch2.type=memory
agent.channels.ch2.capacity=100
agent.channels.ch2.transactionCapacity=5

agent.sinks.sink1.type = hdfs
agent.sinks.sink1.channel = ch2
agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
agent.sinks.sink1.hdfs.rollInterval=1
agent.sinks.sink1.hdfs.fileType = DataStream
agent.sinks.sink1.hdfs.writeFormat = Text
agent.sinks.sink1.hdfs.batchSize = 10
{code}
And there are Exceptions like this:
{panel}
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 
full, consider committing more frequently, increasing capaci
ty, or increasing thread count
at 
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Take list for MemoryTransaction, capacity 5 full, consider comm
itting more frequently, increasing capacity, or increasing thread count
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
{panel}

When takeList of Memory Channel is full,there is a ChannelException will be 
throwed,The event of takeList has been writed by the sink and roll back to the 
queue of memoryChannel at the same time,it is not reasonable.

  was:
Flume can produce endless data when use this following config:
{panel}
agent.sources = src1
agent.sinks = sink1
agent.channels = ch2

agent.sources.src1.type = spooldir
agent.sources.src1.channels = ch2
agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
agent.sources.src1.fileHeader = false
agent.sources.src1.batchSize = 5

agent.channels.ch2.type=memory
agent.channels.ch2.capacity=100
agent.channels.ch2.transactionCapacity=5

agent.sinks.sink1.type = hdfs
agent.sinks.sink1.channel = ch2
agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
agent.sinks.sink1.hdfs.rollInterval=1
agent.sinks.sink1.hdfs.fileType = DataStream
agent.sinks.sink1.hdfs.writeFormat = Text
agent.sinks.sink1.hdfs.batchSize = 10
{panel}
And there are Exceptions like this:
{panel}
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 
full, consider committing more frequently, increasing capaci
ty, or increasing thread count
at 
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Take list for MemoryTransaction, capacity 5 full, consider comm
itting more frequently, increasing capacity, or increasing thread count
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
{panel}

When takeList of Memory Channel is full,there is a ChannelException will be 
throwed,The event of takeList has been writed by the sink and roll back to the 
queue of memoryChannel at the same time,it is not reasonable.


> When batchSize of 

[jira] [Updated] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data

2017-06-09 Thread Yongxi Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongxi Zhang updated FLUME-3106:

Attachment: FLUME-3106-0.patch

> When batchSize of sink greater than transactionCapacity of Memory Channel, 
> Flume can produce endless data
> -
>
> Key: FLUME-3106
> URL: https://issues.apache.org/jira/browse/FLUME-3106
> Project: Flume
>  Issue Type: Bug
>  Components: Channel
>Affects Versions: 1.7.0
>Reporter: Yongxi Zhang
> Fix For: 1.8.0
>
> Attachments: FLUME-3106-0.patch
>
>
> Flume can produce endless data when use this following config:
> {panel}
> agent.sources = src1
> agent.sinks = sink1
> agent.channels = ch2
> agent.sources.src1.type = spooldir
> agent.sources.src1.channels = ch2
> agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
> agent.sources.src1.fileHeader = false
> agent.sources.src1.batchSize = 5
> agent.channels.ch2.type=memory
> agent.channels.ch2.capacity=100
> agent.channels.ch2.transactionCapacity=5
> agent.sinks.sink1.type = hdfs
> agent.sinks.sink1.channel = ch2
> agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
> agent.sinks.sink1.hdfs.rollInterval=1
> agent.sinks.sink1.hdfs.fileType = DataStream
> agent.sinks.sink1.hdfs.writeFormat = Text
> agent.sinks.sink1.hdfs.batchSize = 10
> {panel}
> And there are Exceptions like this:
> {panel}
> org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 
> 5 full, consider committing more frequently, increasing capaci
> ty, or increasing thread count
> at 
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
> at 
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
> at 
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
> at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> 17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
> follows.
> org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
> Take list for MemoryTransaction, capacity 5 full, consider comm
> itting more frequently, increasing capacity, or increasing thread count
> at 
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
> at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> {panel}
> When takeList of Memory Channel is full,there is a ChannelException will be 
> throwed,The event of takeList has been writed by the sink and roll back to 
> the queue of memoryChannel at the same time,it is not reasonable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLUME-3106) When batchSize of sink greater than transactionCapacity of Memory Channel, Flume can produce endless data

2017-06-09 Thread Yongxi Zhang (JIRA)
Yongxi Zhang created FLUME-3106:
---

 Summary: When batchSize of sink greater than transactionCapacity 
of Memory Channel, Flume can produce endless data
 Key: FLUME-3106
 URL: https://issues.apache.org/jira/browse/FLUME-3106
 Project: Flume
  Issue Type: Bug
  Components: Channel
Affects Versions: 1.7.0
Reporter: Yongxi Zhang
 Fix For: 1.8.0


Flume can produce endless data when use this following config:
{panel}
agent.sources = src1
agent.sinks = sink1
agent.channels = ch2

agent.sources.src1.type = spooldir
agent.sources.src1.channels = ch2
agent.sources.src1.spoolDir = /home/kafka/flumeSpooldir
agent.sources.src1.fileHeader = false
agent.sources.src1.batchSize = 5

agent.channels.ch2.type=memory
agent.channels.ch2.capacity=100
agent.channels.ch2.transactionCapacity=5

agent.sinks.sink1.type = hdfs
agent.sinks.sink1.channel = ch2
agent.sinks.sink1.hdfs.path = hdfs://kafka1:9000/flume/
agent.sinks.sink1.hdfs.rollInterval=1
agent.sinks.sink1.hdfs.fileType = DataStream
agent.sinks.sink1.hdfs.writeFormat = Text
agent.sinks.sink1.hdfs.batchSize = 10
{panel}
And there are Exceptions like this:
{panel}
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 5 
full, consider committing more frequently, increasing capaci
ty, or increasing thread count
at 
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:99)
at 
org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at 
org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:362)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
17/06/09 09:48:04 ERROR flume.SinkRunner: Unable to deliver event. Exception 
follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: 
Take list for MemoryTransaction, capacity 5 full, consider comm
itting more frequently, increasing capacity, or increasing thread count
at 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
{panel}

When takeList of Memory Channel is full,there is a ChannelException will be 
throwed,The event of takeList has been writed by the sink and roll back to the 
queue of memoryChannel at the same time,it is not reasonable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLUME-3070) The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once

2017-05-25 Thread Yongxi Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025737#comment-16025737
 ] 

Yongxi Zhang commented on FLUME-3070:
-

[~mpercy][~bessbd][~hshreedharan] Anybody available for review? 

> The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only 
> be replaced once
> 
>
> Key: FLUME-3070
> URL: https://issues.apache.org/jira/browse/FLUME-3070
> Project: Flume
>  Issue Type: Improvement
>Affects Versions: 1.7.0
>Reporter: Yongxi Zhang
> Attachments: FLUME-3070-0.patch
>
>
> The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only 
> be replaced once, which is good for Hdfs Sink to process data efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLUME-3089) flume hive sink

2017-05-06 Thread mengge zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mengge zhang updated FLUME-3089:

Description: 
When flume is running up,the background is being given:
2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR 
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to 
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalArgumentException
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException
at java.sql.Date.valueOf(Date.java:143)
at 
org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:310)
at 
org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218)
at 
org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174)
at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:156)
at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:123)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.writeImpl(HiveEndPoint.java:815)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:782)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735)
at 
org.apache.flume.sink.hive.HiveJsonSerializer.write(HiveJsonSerializer.java:42)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152)
at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more


I do not know why,please help me,thanks.

  was:
When flume is running up,the background is being given:
2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR 
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to 
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalArgumentException
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException
at java.sql.Date.valueOf(Date.java:143)
at 
org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:310)
at 
org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218)
at 
org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174)
at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:156)
at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:123)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.writeImpl(HiveEndPoint.java:815)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:782)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735)
at 
org.apache.flume.sink.hive.HiveJsonSerializer.write(HiveJsonSerializer.java:42)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152)
at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more




> flume  hive sink
> 
>
> Key: FLUME-3089
> URL: https://issues.apache.org/jira/browse/FLUME-3089
> Project: Flume
>  Issue Type: Bug
> Environment: system:Ubuntu 16.04.1 LTS \n \l
> flume:apache-flume-1.7.0-bin.tar.gz
> hive:apache-hive-2.1.1-bin
>Reporter: mengge zhang
>
> When flume is running up,the background is being given:
> 2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
> [ERROR - 

[jira] [Created] (FLUME-3089) flume hive sink

2017-05-06 Thread mengge zhang (JIRA)
mengge zhang created FLUME-3089:
---

 Summary: flume  hive sink
 Key: FLUME-3089
 URL: https://issues.apache.org/jira/browse/FLUME-3089
 Project: Flume
  Issue Type: Bug
 Environment: system:Ubuntu 16.04.1 LTS \n \l
flume:apache-flume-1.7.0-bin.tar.gz
hive:apache-hive-2.1.1-bin
Reporter: mengge zhang


When flume is running up,the background is being given:
2017-05-06 21:17:19,544 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR 
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to 
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalArgumentException
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException
at java.sql.Date.valueOf(Date.java:143)
at 
org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:310)
at 
org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218)
at 
org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174)
at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:156)
at 
org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:123)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.writeImpl(HiveEndPoint.java:815)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:782)
at 
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:735)
at 
org.apache.flume.sink.hive.HiveJsonSerializer.write(HiveJsonSerializer.java:42)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:158)
at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:152)
at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLUME-3070) The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once

2017-03-08 Thread Yongxi Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLUME-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yongxi Zhang updated FLUME-3070:

Attachment: FLUME-3070-0.patch

> The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only 
> be replaced once
> 
>
> Key: FLUME-3070
> URL: https://issues.apache.org/jira/browse/FLUME-3070
> Project: Flume
>  Issue Type: Improvement
>Affects Versions: v1.7.0
>Reporter: Yongxi Zhang
> Attachments: FLUME-3070-0.patch
>
>
> The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only 
> be replaced once, which is good for Hdfs Sink to process data efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLUME-3070) The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be replaced once

2017-03-08 Thread Yongxi Zhang (JIRA)
Yongxi Zhang created FLUME-3070:
---

 Summary: The escape string% [localhost],% [IP], and% [FQDN] of 
Hdfs Sink should only be replaced once
 Key: FLUME-3070
 URL: https://issues.apache.org/jira/browse/FLUME-3070
 Project: Flume
  Issue Type: Improvement
Affects Versions: v1.7.0
Reporter: Yongxi Zhang


The escape string% [localhost],% [IP], and% [FQDN] of Hdfs Sink should only be 
replaced once, which is good for Hdfs Sink to process data efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLUME-2915) The kafka channel using new APIs will be stuck when the sink is avro sink

2016-05-26 Thread Julian Zhang (JIRA)
Julian Zhang created FLUME-2915:
---

 Summary: The kafka channel using new APIs will be stuck when the 
sink is avro sink
 Key: FLUME-2915
 URL: https://issues.apache.org/jira/browse/FLUME-2915
 Project: Flume
  Issue Type: Bug
  Components: Channel
Affects Versions: v1.7.0
Reporter: Julian Zhang


The avro sink was stuck when I using the kafka channel which using the new APIs.
After couple of hours I found the issue at KafkaChannel.java#L384:

e.getHeaders().put(KEY_HEADER, record.key());

and change it to:

if (record.key() != null) {
e.getHeaders().put(KEY_HEADER, record.key());
}

The reason is:
record.key() could be null if the user didn't set it. And the avro serialize 
the event will throw a NullPointerException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLUME-2653) Allow inUseSuffix to be null/empty

2015-10-15 Thread Ke Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLUME-2653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958778#comment-14958778
 ] 

Ke Zhang commented on FLUME-2653:
-

I got this error message when i was trying to update flume settings via 
cloudera manager:
{quote}An error occurred while validating this configuration: Component 
tsv_agent.sinks.hdfs-sink-view.hdfs.inUseSuffix: Property value missing.{quote}

As workaround I extended the HDFSEventSink like below:
{code:title=MyHDFSEventSink.java|borderStyle=solid}
public static final String IN_USE_SUFFIX_ENABLED_KEY = 
"hdfs.inUseSuffixEnabled";
public static final String IN_USE_SUFFIX_KEY = "hdfs.inUseSuffix";

@Override
public void configure(Context context) {
boolean inUseSuffixEnabled = context.getBoolean(IN_USE_SUFFIX_ENABLED_KEY, 
true);
if (!inUseSuffixEnabled) {
context.put(IN_USE_SUFFIX_KEY, "");
}
super.configure(context);
}
{code}

and updated flume sink type to:
{code:title=flume-conf.properties}
my_agent.sinks.my_hdfs_sink.type = com.mycompany.MyHDFSEventSink
my_agent.sinks.my_hdfs_sink.hdfs.inUseSuffixEnabled = false
# ...
{code}

> Allow inUseSuffix to be null/empty
> --
>
> Key: FLUME-2653
> URL: https://issues.apache.org/jira/browse/FLUME-2653
> Project: Flume
>  Issue Type: Improvement
>  Components: Sinks+Sources
>Affects Versions: v1.5.1
>Reporter: Andrew Jones
>  Labels: hdfssink
>
> At the moment, it doesn't seem possible to set the null/empty. We've tried 
> {{''}} which just adds the quotes to the end, and setting to nothing, which 
> just uses the default {{.tmp}}.
> We want the _in use_ file to have the same name as the _closed_ file, so we 
> can read from files that are in use without the file moving from underneath 
> us. In our use case, we know that an in use file is still readable and 
> parseable, because it is just text with a JSON document per line.
> It looks like [the HDFS sink 
> code|https://github.com/apache/flume/blob/542b1695033d330eb00ae81713fdc838b88332b6/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java#L618]
>  can handle this change already, but at the moment there is no way to set the 
> {{bucketPath}} and {{targetPath}} to be the same.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)