[ 
https://issues.apache.org/jira/browse/FLUME-3037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15783074#comment-15783074
 ] 

Tristan Stevens commented on FLUME-3037:
----------------------------------------

Interesting issue. Just reproduced on Mac OSX with debug logging turned on (see 
below).

There's some flawed logic going on in the run() method.

You can see in the log below that 52 characters were retrieved, however only 
two of the events are submitted to the channel. When I changed the python to 
introduce a delay before the connection is killed then all of the events get 
through successfully.

2016-12-28 15:02:31,972 (netcat-handler-0) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:315)]
 Starting connection handler
2016-12-28 15:02:31,972 (netcat-handler-0) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:327)]
 Chars read = 52
2016-12-28 15:02:31,973 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31             test words 1 }
2016-12-28 15:02:31,973 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32             test words 2 }
2016-12-28 15:02:31,973 (netcat-handler-0) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:367)]
 Connection handler exiting
2016-12-28 15:02:52,595 (conf-file-poller-0) [DEBUG - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]
 Checking file:conf/flume2.conf for changes
2016-12-28 15:03:22,601 (conf-file-poller-0) [DEBUG - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]
 Checking file:conf/flume2.conf for changes
2016-12-28 15:03:52,608 (conf-file-poller-0) [DEBUG - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]
 Checking file:conf/flume2.conf for changes
2016-12-28 15:04:22,615 (conf-file-poller-0) [DEBUG - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]
 Checking file:conf/flume2.conf for changes
2016-12-28 15:04:52,621 (conf-file-poller-0) [DEBUG - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)]
 Checking file:conf/flume2.conf for changes

Introduced sleep(5) into the python:

2016-12-28 15:04:53,517 (netcat-handler-1) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:315)]
 Starting connection handler
2016-12-28 15:04:53,517 (netcat-handler-1) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:327)]
 Chars read = 52
2016-12-28 15:04:53,518 (netcat-handler-1) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:331)]
 Events processed = 4
2016-12-28 15:04:54,072 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31             test words 1 }
2016-12-28 15:04:54,072 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32             test words 2 }
2016-12-28 15:04:54,072 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 33             test words 3 }
2016-12-28 15:04:54,073 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - 
org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 34             test words 4 }
2016-12-28 15:04:58,518 (netcat-handler-1) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:327)]
 Chars read = -1
2016-12-28 15:04:58,518 (netcat-handler-1) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:331)]
 Events processed = 0
2016-12-28 15:04:58,518 (netcat-handler-1) [DEBUG - 
org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:367)]
 Connection handler exiting
2016-12-28 15:05:22,627 (conf-file-poller-0) [DEBUG - 
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileC

> can't get events correctly while flume working with python socket or telnet
> ---------------------------------------------------------------------------
>
>                 Key: FLUME-3037
>                 URL: https://issues.apache.org/jira/browse/FLUME-3037
>             Project: Flume
>          Issue Type: Question
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>         Environment: ubuntu 14&java 1.8
> centos  &java 1.7
>            Reporter: finetu
>              Labels: beginner
>
> I want to use flume for collecting logs in python scripts, so I follow user 
> guide to config flume with netcat source, then I use telnet and nc for test, 
> it works well. 
> config code :
> a1.sources = r1
> a1.sinks = k1
> a1.channels = c1
> # Describe/configure the source
> a1.sources.r1.type = netcat
> a1.sources.r1.bind = localhost
> a1.sources.r1.port = 44444
> a1.sinks.k1.type = logger  
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 1000
> a1.channels.c1.transactionCapacity = 100
> a1.sources.r1.channels = c1
> a1.sinks.k1.channel = c1
> Then I use python to connect flume, and send some words to it like this:
> import socket
> def netcat(hostname, port):
>     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>     s.connect((hostname, port))
>     s.send("test words 1\n")
>     s.send("test words 2\n")
>     s.send("test words 3\n")
>     s.send("test words 4\n")
>     s.shutdown(socket.SHUT_WR)
>     s.close()
> if __name__ == "__main__":
>     netcat("127.0.0.1",44444)
> problem happens, flume can only receive 2 rows. 
> flume logs:
> 2016-12-28 16:44:32,248 (lifecycleSupervisor-1-0) [INFO - 
> org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created 
> serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
> 2016-12-28 16:44:41,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO 
> - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
> headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31             test words 1 
> }
> 2016-12-28 16:44:41,815 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO 
> - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { 
> headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32             test words 2 
> }
> I got same result on both ubuntu&java1.8 and centos&java 1.7 and the same 
> result with telnet model in python. 
> Is there anything wrong with config or python scripts ?
> I think the issue-2611 is similar to this one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to