Rajiya Mulani created FLINK-23360:
-------------------------------------
Summary: Facing broker disconnected issue while establishing Kafka
with Azure Event hubs in Spring boot application #543
Key: FLINK-23360
URL: https://issues.apache.org/jira/browse/FLINK-23360
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka, Runtime / Configuration
Reporter: Rajiya Mulani
# Article link I followed to configure Event hub.
[https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
(TLS/SSL)
# code
Configurations in .properties file
###############################################################################################################
h2.
h2. Properties For Azure Event Hub with Kafka
###############################################################################################################
kafkaConnectionUrl=******.servicebus.windows.net:9093*
*security.protocol=SASL_SSL*
*sasl.mechanism=PLAIN*
*sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="$ConnectionString"
password="Endpoint=sb://******.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*************************";
{{ POM File : }}
4.0.0
*******-processor*
*0.0.1-SNAPSHOT*
*******-common*
*jar*
*<project.root.directory>${basedir}/..</project.root.directory>*
*com.*.iot
*****-processor-data
org.apache.kafka
kafka-clients
0.11.0.0
org.apache.commons
commons-lang3
3.9
org.apache.commons
commons-io
1.3.2
commons-configuration
commons-configuration
1.10
'
Kafka producer Configurations :
[@bean|https://github.com/bean]
public Producer<String, IndexMessage> IndexMessageKafkaProducer() {
{{ logger.info("BootStrap Server ");
logger.info(new CommonUtility().getPropertyValue("kafkaConnectionUrl"));
Map<String, Object> configProps = new HashMap();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
configProps.put(ProducerConfig.ACKS_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerAcks"));
configProps.put(ProducerConfig.RETRIES_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerRetries"));
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerBatchSize"));
configProps.put(ProducerConfig.LINGER_MS_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerLingerMs"));
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new
CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new
CommonUtility().getPropertyValue("IndexMessageSerializer")); }}}
Kafka consumer Configurations:
[@bean|https://github.com/bean]
public Consumer<String, IndexMessage> IndexMessageKafkaConsumer() {
{{ Map<String, Object> configProps = new HashMap();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, new
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, new
CommonUtility().getPropertyValue("indexConsumerGroupid"));
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, new
CommonUtility().getPropertyValue("offset"));
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, new
CommonUtility().getPropertyValue("autoCommit"));
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new
CommonUtility().getPropertyValue("kafkaTopicKeyDeserializer"));
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new
CommonUtility().getPropertyValue("IndexMessageDeserializer"));
System.out.println("Inside Consumer Configuration");
return new KafkaConsumer(configProps); }}}
{{}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)