Hi,

I'm trying to use Stateful Functions with Kafka as my ingress and egress.
I'm using the Confluent fully-managed Kafka and I'm having a challenge
adding my authentication details in the module.yaml file.
Here is my current config details:
version: "1.0"
module:
  meta:
    type: remote
  spec:
    functions:
      - function:
          meta:
            kind: http
            type: example/greeter
          spec:
            endpoint: <https-endpoint>
            states:
              - seen_count
            maxNumBatchRequests: 500
            timeout: 2min
    ingresses:
      - ingress:
          meta:
            type: statefun.kafka.io/routable-protobuf-ingress
            id: example/names
          spec:
            address: <confluent-bootstrap-server>
            consumerGroupId: statefun-consumer-group
            topics:
              - topic: names
                typeUrl: com.googleapis/example.GreetRequest
                targets:
                  - example/greeter
            properties:
              - bootstrap.servers:<confluent-bootstrap-server>
              - security.protocol: SASL_SSL
              - sasl.mechanism: PLAIN
              - sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
              - ssl.endpoint.identification.algorithm: https
    egresses:
      - egress:
          meta:
            type: statefun.kafka.io/generic-egress
            id: example/greets
          spec:
            address: <confluent-bootstrap-server>
            deliverySemantic:
              type: exactly-once
              transactionTimeoutMillis: 100000
            properties:
              - bootstrap.servers: <confluent-bootstrap-server>
              - security.protocol: SASL_SSL
              - sasl.mechanisms: PLAIN
              - sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
              - ssl.endpoint.identification.algorithm: https

After running docker-compose with a master and worker containers I'm
getting this error:
Could not find a 'KafkaClient' entry in the JAAS configuration. System
property 'java.security.auth.login.config' is
/tmp/jaas-2846080966990890307.conf

The producer config logged :
worker_1  | 2020-10-07 13:38:08,489 INFO
 org.apache.kafka.clients.producer.ProducerConfig              -
ProducerConfig values:
worker_1  |     acks = 1
worker_1  |     batch.size = 16384
worker_1  |     bootstrap.servers = [https://
---.asia-southeast1.gcp.confluent.cloud:9092]
worker_1  |     buffer.memory = 33554432
worker_1  |     client.dns.lookup = default
worker_1  |     client.id =
worker_1  |     compression.type = none
worker_1  |     connections.max.idle.ms = 540000
worker_1  |     delivery.timeout.ms = 120000
worker_1  |     enable.idempotence = false
worker_1  |     interceptor.classes = []
worker_1  |     key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
worker_1  |     linger.ms = 0
worker_1  |     max.block.ms = 60000
worker_1  |     max.in.flight.requests.per.connection = 5
worker_1  |     max.request.size = 1048576
worker_1  |     metadata.max.age.ms = 300000
worker_1  |     metric.reporters = []
worker_1  |     metrics.num.samples = 2
worker_1  |     metrics.recording.level = INFO
worker_1  |     metrics.sample.window.ms = 30000
worker_1  |     partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
worker_1  |     receive.buffer.bytes = 32768
worker_1  |     reconnect.backoff.max.ms = 1000
worker_1  |     reconnect.backoff.ms = 50
worker_1  |     request.timeout.ms = 30000
worker_1  |     retries = 2147483647
worker_1  |     retry.backoff.ms = 100
worker_1  |     sasl.client.callback.handler.class = null
worker_1  |     sasl.jaas.config = null
worker_1  |     sasl.kerberos.kinit.cmd = /usr/bin/kinit
worker_1  |     sasl.kerberos.min.time.before.relogin = 60000
worker_1  |     sasl.kerberos.service.name = null
worker_1  |     sasl.kerberos.ticket.renew.jitter = 0.05
worker_1  |     sasl.kerberos.ticket.renew.window.factor = 0.8
worker_1  |     sasl.login.callback.handler.class = null
worker_1  |     sasl.login.class = null
worker_1  |     sasl.login.refresh.buffer.seconds = 300
worker_1  |     sasl.login.refresh.min.period.seconds = 60
worker_1  |     sasl.login.refresh.window.factor = 0.8
worker_1  |     sasl.login.refresh.window.jitter = 0.05
worker_1  |     sasl.mechanism = GSSAPI
worker_1  |     security.protocol = SASL_SSL
worker_1  |     send.buffer.bytes = 131072
worker_1  |     ssl.cipher.suites = null
worker_1  |     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
worker_1  |     ssl.endpoint.identification.algorithm = https
worker_1  |     ssl.key.password = null
worker_1  |     ssl.keymanager.algorithm = SunX509
worker_1  |     ssl.keystore.location = null
worker_1  |     ssl.keystore.password = null
worker_1  |     ssl.keystore.type = JKS
worker_1  |     ssl.protocol = TLS
worker_1  |     ssl.provider = null
worker_1  |     ssl.secure.random.implementation = null
worker_1  |     ssl.trustmanager.algorithm = PKIX
worker_1  |     ssl.truststore.location = null
worker_1  |     ssl.truststore.password = null
worker_1  |     ssl.truststore.type = JKS
worker_1  |     transaction.timeout.ms = 100000
worker_1  |     transactional.id = null
worker_1  |     value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
worker_1  |

Is there something that I'm missing?

Reply via email to