dnwe commented on a change in pull request #202: Produce action fixes
URL: 
https://github.com/apache/incubator-openwhisk-package-kafka/pull/202#discussion_r131249653
 
 

 ##########
 File path: action/messageHubProduce.py
 ##########
 @@ -18,64 +18,97 @@
  */
 """
 
-import ssl
 import base64
+import logging
+import ssl
+import sys
+import traceback
+
 from kafka import KafkaProducer
-from kafka.errors import NoBrokersAvailable, KafkaTimeoutError
+from kafka.errors import NoBrokersAvailable, KafkaTimeoutError, 
AuthenticationFailedError
+from kafka.version import __version__
+from random import shuffle
 
 
+logging.basicConfig(stream=sys.stdout, level=logging.INFO,
+        format='%(levelname)-8s %(asctime)s %(message)s',
+        datefmt='[%H:%M:%S]')
+
+max_cached_producers = 10
+
 def main(params):
+    producer = None
+    logging.info("Using kafka-python %s", str(__version__))
+
+    print("Validating parameters")
     validationResult = validateParams(params)
     if validationResult[0] != True:
         return {'error': validationResult[1]}
     else:
         validatedParams = validationResult[1]
 
-    sasl_mechanism = 'PLAIN'
-    security_protocol = 'SASL_SSL'
+    attempt = 0
+    max_attempts = 3
 
-    # Create a new context using system defaults, disable all but TLS1.2
-    context = ssl.create_default_context()
-    context.options &= ssl.OP_NO_TLSv1
-    context.options &= ssl.OP_NO_TLSv1_1
-
-    try:
-        producer = KafkaProducer(
-            api_version=(0, 10),
-            api_version_auto_timeout_ms=15000,
-            bootstrap_servers=validatedParams['kafka_brokers_sasl'],
-            max_block_ms=20000,
-            request_timeout_ms=20000,
-            sasl_plain_username=validatedParams['user'],
-            sasl_plain_password=validatedParams['password'],
-            security_protocol=security_protocol,
-            ssl_context=context,
-            sasl_mechanism=sasl_mechanism)
+    result = {"success": True}
 
-        print("Created producer")
+    while attempt < max_attempts:
+        attempt += 1
+        print("Starting attempt {}".format(attempt))
 
-        # only use the key parameter if it is present
-        if 'key' in validatedParams:
-            messageKey = validatedParams['key']
-            future = producer.send(validatedParams['topic'], 
bytes(validatedParams['value'], 'utf-8'), key=bytes(messageKey, 'utf-8'))
-        else:
-            future = producer.send(validatedParams['topic'], 
bytes(validatedParams['value'], 'utf-8'))
-
-        sent = future.get(timeout=30)
-        msg = "Successfully sent message to {}:{} at offset {}".format(
-            sent.topic, sent.partition, sent.offset)
-
-        print(msg)
-    except KafkaTimeoutError:
-        return {'error': 'Timed out communicating with Message Hub.'}
-    except NoBrokersAvailable:
-        # this exception's message is a little too generic
+        try:
+            print("Getting producer")
+            producer = getProducer(validatedParams)
+
+            topic = validatedParams['topic']
+            print("Finding topic {}".format(topic))
+            partition_info = producer.partitions_for(topic)
+            print("Found topic {} with partition(s) {}".format(topic, 
partition_info))
+
+            break
+        except Exception as e:
+            if attempt == max_attempts:
+                producer = None
+                logging.warning(e)
+                traceback.print_stack(limit=5)
+                result = getResultForException(e)
+
+    # we successfully connected and found the topic metadata... let's send!
+    if producer is not None:
+        try:
+            print("Producing message")
+
+            # only use the key parameter if it is present
+            value = validatedParams['value']
+            if 'key' in validatedParams:
+                messageKey = validatedParams['key']
+                future = producer.send(
+                    topic, bytes(value, 'utf-8'), key=bytes(messageKey, 
'utf-8'))
+            else:
+                future = producer.send(topic, bytes(value, 'utf-8'))
+
+            sent = future.get(timeout=20)
+            msg = "Successfully sent message to {}:{} at offset {}".format(
+                sent.topic, sent.partition, sent.offset)
+            print(msg)
+            result = {"success": True, "message": msg}
+        except Exception as e:
+            logging.warning(e)
+            traceback.print_stack(limit=5)
 
 Review comment:
   Same as the earlier comment about print_exc 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to