[ 
https://issues.apache.org/jira/browse/FLUME-3378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614605#comment-17614605
 ] 

Shahar Frank commented on FLUME-3378:
-------------------------------------

I don't have the code I tested that with anymore so can't be sure. I'll just 
close it now.

> Apache Flume Java client fails to start with a single Kafka sink
> ----------------------------------------------------------------
>
>                 Key: FLUME-3378
>                 URL: https://issues.apache.org/jira/browse/FLUME-3378
>             Project: Flume
>          Issue Type: Bug
>          Components: Configuration
>    Affects Versions: 1.9.0
>            Reporter: Shahar Frank
>            Priority: Major
>
> {{Using `_org.apache.flume.agent.embedded.EmbeddedAgent_`.}}
> {{ Configuration as such:}}
> {code:java}
> Map<String, String> configurationProperties = ...;
>  service.configure(configurationProperties);{code}
> {{Where `configurationProperties` is set with:}}
> {code:java}
> { "kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000", 
> "processor.type": "load_balance", "sinks": "kafkaSink1", 
> "channel.keep-alive": "0", "channel.checkpointDir": 
> "********************************", 
> "kafkaSink.kafka.producer.reconnect.backoff.ms": "2000", "channel.dataDirs": 
> "********************************", 
> "kafkaSink.kafka.producer.retry.backoff.ms": "1000", 
> "processor.selector.maxTimeOut": "60000", 
> "kafkaSink.kafka.producer.max.request.size": "5485760", 
> "kafkaSink1.flumeBatchSize": "1000", 
> "kafkaSink.kafka.producer.buffer.memory": "67108864", 
> "kafkaSink.kafka.producer.client.id": "********************************", 
> "kafkaSink1.useFlumeEventFormat": "true", "kafkaSink1.kafka.topic": 
> "********************************", "kafkaSink.kafka.producer.batch.size": 
> "8196", "channel.kafka.dataDirs": "********************************", 
> "kafkaSink1.type": "KAFKA", "channel.backupCheckpointDir": 
> "********************************", "kafkaSink1.allowTopicOverride": "true", 
> "channel.useDualCheckpoints": "true", 
> "kafkaSink.kafka.producer.compression.type": "lz4", "processor.maxBackoff": 
> "60000", "use_dual_channel": "true", "channel.capacity": "1000000", 
> "channel.byteCapacityBufferPercentage": "50", "channel.transactionCapacity": 
> "1000", "channel.byteCapacity": "10485760", "channel.type": "file", 
> "processor.backoff": "true", "channel.kafka.checkpointDir": 
> "********************************", "channel.kafka.backupCheckpointDir": 
> "********************************", "kafkaSink1.kafka.bootstrap.servers": 
> "********************************", "kafkaSink.kafka.producer.acks": "-1" 
> }{code}
> At runtime it throw the following:
> {code:java}
> java.lang.NullPointerException
>  at 
> org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
>  at 
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
>  at 
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
>  at 
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
>  at 
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
>  at 
> org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
>  at 
> org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
>  at 
> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
>  at 
> org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
>  at 
> org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
>  at 
> org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
>  at *****************.startService(*****************){code}
> And flume will not start whatsoever.
> The code for 
> `_org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52_`
>  shows it is looking for property `sinks` which is not empty so it shouldn't 
> throw any such error...
> Anyone knows why? No documentation about any of that....



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to