[
https://issues.apache.org/jira/browse/FLINK-23360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rajiya Mulani updated FLINK-23360:
----------------------------------
Description:
# Article link I followed to configure Event hub.
[https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
(TLS/SSL)
# code
Configurations in .properties file
###############################################################################################################
h2.
h2. Properties For Azure Event Hub with Kafka
###############################################################################################################
spring.kafka.bootstrap.servers=******.servicebus.windows.net:9093*
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="$ConnectionString"
password="Endpoint=sb://********.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************";
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
\{{ POM File : }}
4.0.0
*******-processor*
*0.0.1-SNAPSHOT*
*******-common*
*jar*
*<project.root.directory>${basedir}/..</project.root.directory>*
*com.*.iot
*****-processor-data
org.apache.kafka
kafka-clients
0.11.0.0
org.apache.commons
commons-lang3
3.9
org.apache.commons
commons-io
1.3.2
commons-configuration
commons-configuration
1.10
Kafka producer Configurations :
@Bean public ProducerFactory<String, byte[]> postDecoderFactory() throws
FileNotFoundException \{ logger.info("BootStrap Server "); logger.info(new
CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers"));
Map<String, Object> configProps = new HashMap();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new
CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers"));
//configProps.put(ProducerConfig.ACKS_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerAcks"));
//configProps.put(ProducerConfig.RETRIES_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerRetries"));
//configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerBatchSize"));
//configProps.put(ProducerConfig.LINGER_MS_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerLingerMs"));
//configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new
CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new
CommonUtility().getPropertyValue("byteMessageSerializer"));
//configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerCompressionType"));
return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public KafkaTemplate<String, byte[]> byteArrayKafkaTemplate()
throws FileNotFoundException \{ return new
KafkaTemplate<>(postDecoderFactory()); }
Error :
2021-07-12 16:09:55.111 WARN 13288 --- [ntainer#2-0-C-1]
org.apache.kafka.clients.NetworkClient : [Consumer
clientId=consumer-m2m-index-preprocessor-topic-cg-1,
groupId=m2m-index-preprocessor-topic-cg] Bootstrap broker
*******.servicebus.windows.net:9093 (id: -1 rack: null) disconnected
was:
# Article link I followed to configure Event hub.
[https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
(TLS/SSL)
# code
Configurations in .properties file
###############################################################################################################
h2.
h2. Properties For Azure Event Hub with Kafka
###############################################################################################################
kafkaConnectionUrl=******.servicebus.windows.net:9093*
*security.protocol=SASL_SSL*
*sasl.mechanism=PLAIN*
*sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="$ConnectionString"
password="Endpoint=sb://******.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*************************";
{{ POM File : }}
4.0.0
*******-processor*
*0.0.1-SNAPSHOT*
*******-common*
*jar*
*<project.root.directory>${basedir}/..</project.root.directory>*
*com.*.iot
*****-processor-data
org.apache.kafka
kafka-clients
0.11.0.0
org.apache.commons
commons-lang3
3.9
org.apache.commons
commons-io
1.3.2
commons-configuration
commons-configuration
1.10
'
Kafka producer Configurations :
[@bean|https://github.com/bean]
public Producer<String, IndexMessage> IndexMessageKafkaProducer() {
{{ logger.info("BootStrap Server ");
logger.info(new CommonUtility().getPropertyValue("kafkaConnectionUrl"));
Map<String, Object> configProps = new HashMap();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
configProps.put(ProducerConfig.ACKS_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerAcks"));
configProps.put(ProducerConfig.RETRIES_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerRetries"));
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerBatchSize"));
configProps.put(ProducerConfig.LINGER_MS_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerLingerMs"));
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new
CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new
CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new
CommonUtility().getPropertyValue("IndexMessageSerializer")); }}}
Kafka consumer Configurations:
[@bean|https://github.com/bean]
public Consumer<String, IndexMessage> IndexMessageKafkaConsumer() {
{{ Map<String, Object> configProps = new HashMap();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, new
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, new
CommonUtility().getPropertyValue("indexConsumerGroupid"));
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, new
CommonUtility().getPropertyValue("offset"));
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, new
CommonUtility().getPropertyValue("autoCommit"));
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new
CommonUtility().getPropertyValue("kafkaTopicKeyDeserializer"));
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new
CommonUtility().getPropertyValue("IndexMessageDeserializer"));
System.out.println("Inside Consumer Configuration");
return new KafkaConsumer(configProps); }}}
{{}}
> Facing broker disconnected issue while establishing Kafka with Azure Event
> hubs in Spring boot application #543
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-23360
> URL: https://issues.apache.org/jira/browse/FLINK-23360
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Runtime / Configuration
> Reporter: Rajiya Mulani
> Priority: Critical
>
> # Article link I followed to configure Event hub.
>
> [https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
> (TLS/SSL)
> # code
> Configurations in .properties file
> ###############################################################################################################
> h2.
> h2. Properties For Azure Event Hub with Kafka
> ###############################################################################################################
> spring.kafka.bootstrap.servers=******.servicebus.windows.net:9093*
> spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
> required username="$ConnectionString"
> password="Endpoint=sb://********.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************";
> spring.kafka.properties.security.protocol=SASL_SSL
> spring.kafka.properties.sasl.mechanism=PLAIN
> \{{ POM File : }}
> 4.0.0
> *******-processor*
> *0.0.1-SNAPSHOT*
> *******-common*
> *jar*
> *<project.root.directory>${basedir}/..</project.root.directory>*
> *com.*.iot
> *****-processor-data
> org.apache.kafka
> kafka-clients
> 0.11.0.0
> org.apache.commons
> commons-lang3
> 3.9
> org.apache.commons
> commons-io
> 1.3.2
> commons-configuration
> commons-configuration
> 1.10
> Kafka producer Configurations :
> @Bean public ProducerFactory<String, byte[]> postDecoderFactory() throws
> FileNotFoundException \{ logger.info("BootStrap Server "); logger.info(new
> CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers"));
> Map<String, Object> configProps = new HashMap();
> configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new
> CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers"));
> //configProps.put(ProducerConfig.ACKS_CONFIG, new
> CommonUtility().getPropertyValue("kafkaProducerAcks"));
> //configProps.put(ProducerConfig.RETRIES_CONFIG, new
> CommonUtility().getPropertyValue("kafkaProducerRetries"));
> //configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new
> CommonUtility().getPropertyValue("kafkaProducerBatchSize"));
> //configProps.put(ProducerConfig.LINGER_MS_CONFIG, new
> CommonUtility().getPropertyValue("kafkaProducerLingerMs"));
> //configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new
> CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));
> configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new
> CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));
> configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new
> CommonUtility().getPropertyValue("byteMessageSerializer"));
> //configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, new
> CommonUtility().getPropertyValue("kafkaProducerCompressionType"));
> return new DefaultKafkaProducerFactory<>(configProps); }
> @Bean public KafkaTemplate<String, byte[]> byteArrayKafkaTemplate()
> throws FileNotFoundException \{ return new
> KafkaTemplate<>(postDecoderFactory()); }
>
> Error :
> 2021-07-12 16:09:55.111 WARN 13288 --- [ntainer#2-0-C-1]
> org.apache.kafka.clients.NetworkClient : [Consumer
> clientId=consumer-m2m-index-preprocessor-topic-cg-1,
> groupId=m2m-index-preprocessor-topic-cg] Bootstrap broker
> *******.servicebus.windows.net:9093 (id: -1 rack: null) disconnected
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)