[jira] [Created] (FLUME-2815) OrderSelector may lead to no sink to use.
tcltsh created FLUME-2815: - Summary: OrderSelector may lead to no sink to use. Key: FLUME-2815 URL: https://issues.apache.org/jira/browse/FLUME-2815 Project: Flume Issue Type: Bug Reporter: tcltsh file path:flume/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java in func getIndexList(): if a machine whth high load on network. it may cause all the sinks time-out in period. and when the machine is usual. all the sinks may in back-off status. It may be a long time to flume that has no sink to use. so if we can add a feature: if no sinks returned, return all sinks? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLUME-2307) Remove Log writetimeout
[ https://issues.apache.org/jira/browse/FLUME-2307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963850#comment-14963850 ] tzachi commented on FLUME-2307: --- Hi guys, this is still happening with Flume 1.5.0-cdh5.4.2. As mentioned above it is also not consistent. It works for few days as expected (which means deleting old files after 2 checkpoints), and then from some unknown reason it stops deleting old files, until the disk gets full and the logs start shouting "Usable space exhausted". I am using 2 different file channels (with 2 different sinks) and both data directories experience the same issue (having lots of old files). > Remove Log writetimeout > --- > > Key: FLUME-2307 > URL: https://issues.apache.org/jira/browse/FLUME-2307 > Project: Flume > Issue Type: Bug > Components: Channel >Affects Versions: v1.4.0 >Reporter: Steve Zesch >Assignee: Hari Shreedharan > Fix For: v1.5.0 > > Attachments: FLUME-2307-1.patch, FLUME-2307.patch > > > I've observed Flume failing to clean up old log data in FileChannels. The > amount of old log data can range anywhere from tens to hundreds of GB. I was > able to confirm that the channels were in fact empty. This behavior always > occurs after lock timeouts when attempting to put, take, rollback, or commit > to a FileChannel. Once the timeout occurs, Flume stops cleaning up the old > files. I was able to confirm that the Log's writeCheckpoint method was still > being called and successfully obtaining a lock from tryLockExclusive(), but I > was not able to confirm removeOldLogs being called. The application log did > not include "Removing old file: log-xyz" for the old files which the Log > class would output if they were correctly being removed. I suspect the lock > timeouts were due to high I/O load at the time. > Some stack traces: > {code} > org.apache.flume.ChannelException: Failed to obtain lock for writing to the > log. Try increasing the log write timeout value. [channel=fileChannel] > at > org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:478) > at > org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93) > at > org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80) > at > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189) > org.apache.flume.ChannelException: Failed to obtain lock for writing to the > log. Try increasing the log write timeout value. [channel=fileChannel] > at > org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doCommit(FileChannel.java:594) > at > org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) > at > dataxu.flume.plugins.avro.AsyncAvroSink.process(AsyncAvroSink.java:548) > at > dataxu.flume.plugins.ClassLoaderFlumeSink.process(ClassLoaderFlumeSink.java:33) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:619) > org.apache.flume.ChannelException: Failed to obtain lock for writing to the > log. Try increasing the log write timeout value. [channel=fileChannel] > at > org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:621) > at > org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168) > at > org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) > at > dataxu.flume.plugins.avro.AvroSource.appendBatch(AvroSource.java:209) > at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:91) > at org.apache.avro.ipc.Responder.respond(Responder.java:151) > at > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188) > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75) > at > org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792) > at > org.jboss.netty.channel.Channel
[jira] [Commented] (FLUME-2787) org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer does not serialize @timestamp correctly
[ https://issues.apache.org/jira/browse/FLUME-2787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963284#comment-14963284 ] Tianji Li commented on FLUME-2787: -- [~hshreedharan] Could you have a look at the patch when you get a chance? > org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer does not > serialize @timestamp correctly > -- > > Key: FLUME-2787 > URL: https://issues.apache.org/jira/browse/FLUME-2787 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources >Reporter: Eran W >Priority: Minor > Attachments: FLUME-2787.2.patch, FLUME-2787.patch > > > When using > agent.sinks.elastic-sink.serializer = > org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer > the event timestamp is stored as string and not dateOptionalTime if the > agent.sinks.elastic-sink.serializer is not set the code works as expected. > REPRO: > 1) use the following config > agent.channels.channel-elastic.type = memory > agent.channels.channel-elastic.capacity = 1000 > agent.channels.channel-elastic.transactionCapacity = 100 > # Define a source on agent and connect to channel. > agent.sources.tail-source.type = exec > agent.sources.tail-source.command = tail -4000 /home/cto/hs_err_pid11679.log > agent.sources.tail-source.channels = channel-elastic > #INTERCEPTORS > agent.sources.tail-source.interceptors = timestampInterceptor > agent.sources.tail-source.interceptors.timestampInterceptor.type = > org.apache.flume.interceptor.TimestampInterceptor$Builder > agent.sources.tail-source.interceptors.timestampInterceptor.preserveExisting > = true > agent.sinks.elastic-sink.channel = channel-elastic > agent.sinks.elastic-sink.type = > org.apache.flume.sink.elasticsearch.ElasticSearchSink > agent.sinks.elastic-sink.hostNames = 127.0.0.1:9300 > agent.sinks.elastic-sink.indexName = flume_index > agent.sinks.elastic-sink.indexType = logs_type > agent.sinks.elastic-sink.clusterName = elasticsearch > agent.sinks.elastic-sink.batchSize = 10 > agent.sinks.elastic-sink.ttl = 5d > agent.sinks.elastic-sink.serializer = > org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer > # Finally, activate. > agent.channels = channel-elastic > agent.sources = tail-source > agent.sinks = elastic-sink > 2) run it > 3) look at the elastic index which was created: > { > "state": "open", > "settings": { > "index": { > "creation_date": "1441728286466", > "number_of_shards": "5", > "number_of_replicas": "1", > "version": { > "created": "1070199" > }, > "uuid": "9u-OCPxoQHWwURHyxh15lA" > } > }, > "mappings": { > "logs_type": { > "properties": { > "body": { > "type": "string" > }, > "timestamp": { > "type": "string" > } > } > } > }, > "aliases": [ ] > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLUME-2792) Flume Kafka Kerberos Support
[ https://issues.apache.org/jira/browse/FLUME-2792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14958902#comment-14958902 ] Alexander Bij edited comment on FLUME-2792 at 10/19/15 8:49 AM: We are using HDP stack 2.3 with simular setup. Kerberos and Ranger up and running. I can consume messages from a topic, but ingesting from Flume does not work. We have a sink to HDFS using the flume keytab from /etc/security/keytabs/. This keytab file + pricipal is a setting in the HDFSSink! I want to use the KafkaSink write to Kerberos secured Kafka-Cluster. (PLAINTEXTSASL) I tried the setting you suggested at point 1. Unfortunately the setting is not used. log: flume[agent].log {noformat} 15 Oct 2015 15:08:55,268 WARN [lifecycleSupervisor-1-8] (kafka.utils.Logging$class.warn:83) - Property security.protocol is not valid {noformat} Flume tries to connect with KafkaBroker, there I get error; GSSHeader dit not find right tag. I have the feeling its not sending with SASL. Do you have other ideas? kafka-broker.log: {noformat} [2015-10-15 14:50:34,142] ERROR Closing socket for /10.3.19 because of error (kafka.network.Processor) java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag)] at org.apache.kafka.common.network.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:190) at org.apache.kafka.common.network.Channel.connect(Channel.java:71) at kafka.network.Processor.handshake(SocketServer.scala:520) at kafka.network.Processor.run(SocketServer.scala:409) at java.lang.Thread.run(Thread.java:745) Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag)] at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:177) at org.apache.kafka.common.network.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:180) ... 4 more Caused by: GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag) at sun.security.jgss.GSSHeader.(GSSHeader.java:97) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:306) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:285) at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:155) ... 5 more {noformat} was (Author: abij): We are using HDP stack 2.3 with simular setup. Kerberos and Ranger up and running. I can consume messages from a topic, but ingesting from Flume does not work. We have a sink to HDFS using the flume keytab from /etc/security/keytabs/. This keytab file + pricipal is a setting in the HDFSSink! I want to use the KafkaSink write to Kerberos secured Kafka-Cluster. (PLAINTEXTSASL) I tried the setting you suggested at point 1. Unfortunately the setting is not used. log: flume-[agent].log 15 Oct 2015 15:08:55,268 WARN [lifecycleSupervisor-1-8] (kafka.utils.Logging$class.warn:83) - Property security.protocol is not valid Flume tries to connect with KafkaBroker, there I get error; GSSHeader dit not find right tag. I have the feeling its not sending with SASL. Do you have other ideas? kafka-broker.log: [2015-10-15 14:50:34,142] ERROR Closing socket for /10.3.19 because of error (kafka.network.Processor) java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag)] at org.apache.kafka.common.network.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:190) at org.apache.kafka.common.network.Channel.connect(Channel.java:71) at kafka.network.Processor.handshake(SocketServer.scala:520) at kafka.network.Processor.run(SocketServer.scala:409) at java.lang.Thread.run(Thread.java:745) Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag)] at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:177) at org.apache.kafka.common.network.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:180) ... 4 more Caused by: GSSException: Defective token detected (Mechanism level: GSSHeader did not find the right tag) at sun.security.jgss.GSSHeader.(GSSHeader.java:97) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:306) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:285) at com.sun.security.sasl.gsskerb.GssKrb5Server.eva