[ 
https://issues.apache.org/jira/browse/FLINK-23360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiya Mulani updated FLINK-23360:
----------------------------------
    Description: 
# Article link I followed to configure Event hub.
 
[https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
 (TLS/SSL)

 # code
 Configurations in .properties file

###############################################################################################################
h2.  
h2. Properties For Azure Event Hub with Kafka

###############################################################################################################

spring.kafka.bootstrap.servers=******.servicebus.windows.net:9093*
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required username="$ConnectionString" 
password="Endpoint=sb://********.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************";
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
  \{{ POM File : }}

4.0.0

*******-processor*
 *0.0.1-SNAPSHOT*

*******-common*
 *jar*

*<project.root.directory>${basedir}/..</project.root.directory>*

*com.*.iot
 *****-processor-data

org.apache.kafka
 kafka-clients
 0.11.0.0

org.apache.commons
 commons-lang3
 3.9

org.apache.commons
 commons-io
 1.3.2

commons-configuration
 commons-configuration
 1.10

Kafka producer Configurations :

@Bean    public ProducerFactory<String, byte[]> postDecoderFactory() throws 
FileNotFoundException \{ logger.info("BootStrap Server "); logger.info(new 
CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers")); 
Map<String, Object> configProps = new HashMap();   
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers"));   
//configProps.put(ProducerConfig.ACKS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerAcks"));   
//configProps.put(ProducerConfig.RETRIES_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerRetries"));   
//configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBatchSize"));   
//configProps.put(ProducerConfig.LINGER_MS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerLingerMs"));   
//configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));   
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));   
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("byteMessageSerializer"));   
//configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerCompressionType"));          
return new DefaultKafkaProducerFactory<>(configProps);    }
    @Bean    public KafkaTemplate<String, byte[]> byteArrayKafkaTemplate() 
throws FileNotFoundException \{       return new 
KafkaTemplate<>(postDecoderFactory());    }

 

Error :

2021-07-12 16:09:55.111 WARN 13288 --- [ntainer#2-0-C-1] 
org.apache.kafka.clients.NetworkClient : [Consumer 
clientId=consumer-m2m-index-preprocessor-topic-cg-1, 
groupId=m2m-index-preprocessor-topic-cg] Bootstrap broker 
*******.servicebus.windows.net:9093 (id: -1 rack: null) disconnected

 

 

  was:
# Article link I followed to configure Event hub.
[https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
 (TLS/SSL)

 # code
Configurations in .properties file

###############################################################################################################
h2.  
h2. Properties For Azure Event Hub with Kafka

###############################################################################################################

kafkaConnectionUrl=******.servicebus.windows.net:9093*
*security.protocol=SASL_SSL*
*sasl.mechanism=PLAIN*
*sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="$ConnectionString" 
password="Endpoint=sb://******.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*************************";
 {{ POM File : }}

4.0.0

*******-processor*
*0.0.1-SNAPSHOT*

*******-common*
*jar*

*<project.root.directory>${basedir}/..</project.root.directory>*



*com.*.iot
*****-processor-data


org.apache.kafka
kafka-clients
0.11.0.0


org.apache.commons
commons-lang3
3.9


org.apache.commons
commons-io
1.3.2


commons-configuration
commons-configuration
1.10


'

Kafka producer Configurations :
[@bean|https://github.com/bean]
public Producer<String, IndexMessage> IndexMessageKafkaProducer() {
 {{     logger.info("BootStrap Server ");
        logger.info(new CommonUtility().getPropertyValue("kafkaConnectionUrl"));
        Map<String, Object> configProps = new HashMap();
        
          configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
          configProps.put(ProducerConfig.ACKS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerAcks"));
          configProps.put(ProducerConfig.RETRIES_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerRetries"));
          configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBatchSize"));
          configProps.put(ProducerConfig.LINGER_MS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerLingerMs"));
          configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));
          configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));
          configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("IndexMessageSerializer"));  }}}
Kafka consumer Configurations:

[@bean|https://github.com/bean]
public Consumer<String, IndexMessage> IndexMessageKafkaConsumer() {
 {{     Map<String, Object> configProps = new HashMap();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, new 
CommonUtility().getPropertyValue("indexConsumerGroupid"));
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, new 
CommonUtility().getPropertyValue("offset"));
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, new 
CommonUtility().getPropertyValue("autoCommit"));

        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaTopicKeyDeserializer"));
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new 
CommonUtility().getPropertyValue("IndexMessageDeserializer"));
        
        
        System.out.println("Inside Consumer Configuration");
        return new KafkaConsumer(configProps); }}}
{{}}


> Facing broker disconnected issue while establishing Kafka with Azure Event 
> hubs in Spring boot application #543
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23360
>                 URL: https://issues.apache.org/jira/browse/FLINK-23360
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Runtime / Configuration
>            Reporter: Rajiya Mulani
>            Priority: Critical
>
> # Article link I followed to configure Event hub.
>  
> [https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
>  (TLS/SSL)
>  # code
>  Configurations in .properties file
> ###############################################################################################################
> h2.  
> h2. Properties For Azure Event Hub with Kafka
> ###############################################################################################################
> spring.kafka.bootstrap.servers=******.servicebus.windows.net:9093*
> spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="$ConnectionString" 
> password="Endpoint=sb://********.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=**************";
> spring.kafka.properties.security.protocol=SASL_SSL
> spring.kafka.properties.sasl.mechanism=PLAIN
>   \{{ POM File : }}
> 4.0.0
> *******-processor*
>  *0.0.1-SNAPSHOT*
> *******-common*
>  *jar*
> *<project.root.directory>${basedir}/..</project.root.directory>*
> *com.*.iot
>  *****-processor-data
> org.apache.kafka
>  kafka-clients
>  0.11.0.0
> org.apache.commons
>  commons-lang3
>  3.9
> org.apache.commons
>  commons-io
>  1.3.2
> commons-configuration
>  commons-configuration
>  1.10
> Kafka producer Configurations :
> @Bean    public ProducerFactory<String, byte[]> postDecoderFactory() throws 
> FileNotFoundException \{ logger.info("BootStrap Server "); logger.info(new 
> CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers")); 
> Map<String, Object> configProps = new HashMap();   
> configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
> CommonUtility().getPropertyValue("spring.kafka.bootstrap.servers"));   
> //configProps.put(ProducerConfig.ACKS_CONFIG, new 
> CommonUtility().getPropertyValue("kafkaProducerAcks"));   
> //configProps.put(ProducerConfig.RETRIES_CONFIG, new 
> CommonUtility().getPropertyValue("kafkaProducerRetries"));   
> //configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new 
> CommonUtility().getPropertyValue("kafkaProducerBatchSize"));   
> //configProps.put(ProducerConfig.LINGER_MS_CONFIG, new 
> CommonUtility().getPropertyValue("kafkaProducerLingerMs"));   
> //configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new 
> CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));   
> configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new 
> CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));   
> configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new 
> CommonUtility().getPropertyValue("byteMessageSerializer"));   
> //configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, new 
> CommonUtility().getPropertyValue("kafkaProducerCompressionType"));          
> return new DefaultKafkaProducerFactory<>(configProps);    }
>     @Bean    public KafkaTemplate<String, byte[]> byteArrayKafkaTemplate() 
> throws FileNotFoundException \{       return new 
> KafkaTemplate<>(postDecoderFactory());    }
>  
> Error :
> 2021-07-12 16:09:55.111 WARN 13288 --- [ntainer#2-0-C-1] 
> org.apache.kafka.clients.NetworkClient : [Consumer 
> clientId=consumer-m2m-index-preprocessor-topic-cg-1, 
> groupId=m2m-index-preprocessor-topic-cg] Bootstrap broker 
> *******.servicebus.windows.net:9093 (id: -1 rack: null) disconnected
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to