I share the same connection factory/MessageProducer on the different writer (optionnal).

Regards
JB

On 06/22/2016 10:34 PM, Jesse Anderson wrote:
@Raghu is there a reason why you'd need more than one KafkaProducer per
process? Maybe the KafkaProducer could be static.

@JB how are you handling this in the JMS IO?

On Wed, Jun 22, 2016 at 12:36 PM Aljoscha Krettek <[email protected]
<mailto:[email protected]>> wrote:

    The Flink runner sits at the other end of the spectrum, in that
    everything is one big bundle. So the Kafka Sind should work well
    with that.

    On Wed, 22 Jun 2016 at 20:30 Raghu Angadi <[email protected]
    <mailto:[email protected]>> wrote:

        Yeah, this is pretty bad right now. There is a proposal to add
        API to DoFn that makes it easy to state across multiple bundles.

        I think the current sink is pretty unusable in this state. We
        could use an cache of producers (that expire if unused for a few
        seconds).

        On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson
        <[email protected] <mailto:[email protected]>> wrote:

            The KafkaIO KafkaProducer seems to be closing after every
            send. On the next send, the KafkaProducer is opened again.

            Here is the log output;
            2016-06-22 11:04:49,356 WARN  KafkaIO:714 - Looks like
            generateSplits() is not called. Generate single split.
            2016-06-22 11:04:49,399 INFO  ConsumerConfig:165 -
            ConsumerConfig values:
            metric.reporters = []
            metadata.max.age.ms <http://metadata.max.age.ms> = 300000
            value.deserializer = class
            org.apache.kafka.common.serialization.ByteArrayDeserializer
            group.id <http://group.id> =
            partition.assignment.strategy =
            [org.apache.kafka.clients.consumer.RangeAssignor]
            reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            max.partition.fetch.bytes = 1048576
            bootstrap.servers = [broker1:9092]
            retry.backoff.ms <http://retry.backoff.ms> = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.service.name
            <http://sasl.kerberos.service.name> = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.keystore.type = JKS
            ssl.trustmanager.algorithm = PKIX
            enable.auto.commit = false
            ssl.key.password = null
            fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms <http://connections.max.idle.ms> =
            540000
            ssl.truststore.password = null
            session.timeout.ms <http://session.timeout.ms> = 30000
            metrics.num.samples = 2
            client.id <http://client.id> =
            ssl.endpoint.identification.algorithm = null
            key.deserializer = class
            org.apache.kafka.common.serialization.ByteArrayDeserializer
            ssl.protocol = TLS
            check.crcs = true
            request.timeout.ms <http://request.timeout.ms> = 40000
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.keystore.location = null
            heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
            auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
            receive.buffer.bytes = 524288
            ssl.cipher.suites = null
            ssl.truststore.type = JKS
            security.protocol = PLAINTEXT
            ssl.truststore.location = null
            ssl.keystore.password = null
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms <http://metrics.sample.window.ms> =
            30000
            fetch.min.bytes = 1
            send.buffer.bytes = 131072
            auto.offset.reset = earliest

            2016-06-22 11:04:49,681 INFO  AppInfoParser:82 - Kafka
            version : 0.9.0.1
            2016-06-22 11:04:49,681 INFO  AppInfoParser:83 - Kafka
            commitId : 23c69d62a0cabf06
            2016-06-22 11:04:49,865 INFO  KafkaIO:692 - Partitions
            assigned to split 0 (total 1): eventsim-0
            2016-06-22 11:04:49,921 INFO  ConsumerConfig:165 -
            ConsumerConfig values:
            metric.reporters = []
            metadata.max.age.ms <http://metadata.max.age.ms> = 300000
            value.deserializer = class
            org.apache.kafka.common.serialization.ByteArrayDeserializer
            group.id <http://group.id> =
            partition.assignment.strategy =
            [org.apache.kafka.clients.consumer.RangeAssignor]
            reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            max.partition.fetch.bytes = 1048576
            bootstrap.servers = [broker1:9092]
            retry.backoff.ms <http://retry.backoff.ms> = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.service.name
            <http://sasl.kerberos.service.name> = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.keystore.type = JKS
            ssl.trustmanager.algorithm = PKIX
            enable.auto.commit = false
            ssl.key.password = null
            fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms <http://connections.max.idle.ms> =
            540000
            ssl.truststore.password = null
            session.timeout.ms <http://session.timeout.ms> = 30000
            metrics.num.samples = 2
            client.id <http://client.id> =
            ssl.endpoint.identification.algorithm = null
            key.deserializer = class
            org.apache.kafka.common.serialization.ByteArrayDeserializer
            ssl.protocol = TLS
            check.crcs = true
            request.timeout.ms <http://request.timeout.ms> = 40000
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.keystore.location = null
            heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
            auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
            receive.buffer.bytes = 524288
            ssl.cipher.suites = null
            ssl.truststore.type = JKS
            security.protocol = PLAINTEXT
            ssl.truststore.location = null
            ssl.keystore.password = null
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms <http://metrics.sample.window.ms> =
            30000
            fetch.min.bytes = 1
            send.buffer.bytes = 131072
            auto.offset.reset = earliest

            2016-06-22 11:04:49,924 INFO  AppInfoParser:82 - Kafka
            version : 0.9.0.1
            2016-06-22 11:04:49,924 INFO  AppInfoParser:83 - Kafka
            commitId : 23c69d62a0cabf06
            2016-06-22 11:04:49,926 INFO  KafkaIO:937 - Reader-0:
            resuming eventsim-0 at default offset
            2016-06-22 11:04:49,934 INFO  ConsumerConfig:165 -
            ConsumerConfig values:
            metric.reporters = []
            metadata.max.age.ms <http://metadata.max.age.ms> = 300000
            value.deserializer = class
            org.apache.kafka.common.serialization.ByteArrayDeserializer
            group.id <http://group.id> =
            Reader-0_offset_consumer_698533492_none
            partition.assignment.strategy =
            [org.apache.kafka.clients.consumer.RangeAssignor]
            reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            max.partition.fetch.bytes = 1048576
            bootstrap.servers = [broker1:9092]
            retry.backoff.ms <http://retry.backoff.ms> = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.service.name
            <http://sasl.kerberos.service.name> = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.keystore.type = JKS
            ssl.trustmanager.algorithm = PKIX
            enable.auto.commit = false
            ssl.key.password = null
            fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms <http://connections.max.idle.ms> =
            540000
            ssl.truststore.password = null
            session.timeout.ms <http://session.timeout.ms> = 30000
            metrics.num.samples = 2
            client.id <http://client.id> =
            ssl.endpoint.identification.algorithm = null
            key.deserializer = class
            org.apache.kafka.common.serialization.ByteArrayDeserializer
            ssl.protocol = TLS
            check.crcs = true
            request.timeout.ms <http://request.timeout.ms> = 40000
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.keystore.location = null
            heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
            auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
            receive.buffer.bytes = 524288
            ssl.cipher.suites = null
            ssl.truststore.type = JKS
            security.protocol = PLAINTEXT
            ssl.truststore.location = null
            ssl.keystore.password = null
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms <http://metrics.sample.window.ms> =
            30000
            fetch.min.bytes = 1
            send.buffer.bytes = 131072
            auto.offset.reset = earliest

            2016-06-22 11:04:49,941 INFO  AppInfoParser:82 - Kafka
            version : 0.9.0.1
            2016-06-22 11:04:49,942 INFO  AppInfoParser:83 - Kafka
            commitId : 23c69d62a0cabf06
            2016-06-22 11:04:50,819 INFO  AbstractCoordinator:529 -
            Marking the coordinator 2147483647 <tel:2147483647> dead.
            2016-06-22 11:04:50,884 INFO  AbstractCoordinator:529 -
            Marking the coordinator 2147483647 <tel:2147483647> dead.
            2016-06-22 11:04:50,931 INFO  AbstractCoordinator:529 -
            Marking the coordinator 2147483647 <tel:2147483647> dead.
            2016-06-22 11:04:50,992 INFO  AbstractCoordinator:529 -
            Marking the coordinator 2147483647 <tel:2147483647> dead.
            2016-06-22 11:04:51,061 INFO  AbstractCoordinator:529 -
            Marking the coordinator 2147483647 <tel:2147483647> dead.
            2016-06-22 11:04:51,212 INFO  KafkaIO:1013 - Reader-0: first
            record offset 0
            HERE
            HERE
            HERE
            2016-06-22 11:04:51,529 INFO  ProducerConfig:165 -
            ProducerConfig values:
            compression.type = none
            metric.reporters = []
            metadata.max.age.ms <http://metadata.max.age.ms> = 300000
            metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
            = 60000
            reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            bootstrap.servers = [broker1:9092]
            retry.backoff.ms <http://retry.backoff.ms> = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            buffer.memory = 33554432
            timeout.ms <http://timeout.ms> = 30000
            key.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            sasl.kerberos.service.name
            <http://sasl.kerberos.service.name> = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.keystore.type = JKS
            ssl.trustmanager.algorithm = PKIX
            block.on.buffer.full = false
            ssl.key.password = null
            max.block.ms <http://max.block.ms> = 60000
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms <http://connections.max.idle.ms> =
            540000
            ssl.truststore.password = null
            max.in.flight.requests.per.connection = 5
            metrics.num.samples = 2
            client.id <http://client.id> =
            ssl.endpoint.identification.algorithm = null
            ssl.protocol = TLS
            request.timeout.ms <http://request.timeout.ms> = 30000
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            acks = 1
            batch.size = 16384
            ssl.keystore.location = null
            receive.buffer.bytes = 32768
            ssl.cipher.suites = null
            ssl.truststore.type = JKS
            security.protocol = PLAINTEXT
            retries = 3
            max.request.size = 1048576
            value.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            ssl.truststore.location = null
            ssl.keystore.password = null
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms <http://metrics.sample.window.ms> =
            30000
            partitioner.class = class
            org.apache.kafka.clients.producer.internals.DefaultPartitioner
            send.buffer.bytes = 131072
            linger.ms <http://linger.ms> = 0

            2016-06-22 11:04:51,574 INFO  AppInfoParser:82 - Kafka
            version : 0.9.0.1
            2016-06-22 11:04:51,574 INFO  AppInfoParser:83 - Kafka
            commitId : 23c69d62a0cabf06
            2016-06-22 11:04:51,671 WARN  NetworkClient:582 - Error
            while fetching metadata with correlation id 0 :
            {eventsimoutput=LEADER_NOT_AVAILABLE}
            2016-06-22 11:04:51,797 INFO  KafkaProducer:613 - Closing
            the Kafka producer with timeoutMillis = 9223372036854775807 ms.
            HERE
            2016-06-22 11:05:13,247 INFO  ProducerConfig:165 -
            ProducerConfig values:
            compression.type = none
            metric.reporters = []
            metadata.max.age.ms <http://metadata.max.age.ms> = 300000
            metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
            = 60000
            reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            bootstrap.servers = [broker1:9092]
            retry.backoff.ms <http://retry.backoff.ms> = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            buffer.memory = 33554432
            timeout.ms <http://timeout.ms> = 30000
            key.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            sasl.kerberos.service.name
            <http://sasl.kerberos.service.name> = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.keystore.type = JKS
            ssl.trustmanager.algorithm = PKIX
            block.on.buffer.full = false
            ssl.key.password = null
            max.block.ms <http://max.block.ms> = 60000
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms <http://connections.max.idle.ms> =
            540000
            ssl.truststore.password = null
            max.in.flight.requests.per.connection = 5
            metrics.num.samples = 2
            client.id <http://client.id> =
            ssl.endpoint.identification.algorithm = null
            ssl.protocol = TLS
            request.timeout.ms <http://request.timeout.ms> = 30000
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            acks = 1
            batch.size = 16384
            ssl.keystore.location = null
            receive.buffer.bytes = 32768
            ssl.cipher.suites = null
            ssl.truststore.type = JKS
            security.protocol = PLAINTEXT
            retries = 3
            max.request.size = 1048576
            value.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            ssl.truststore.location = null
            ssl.keystore.password = null
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms <http://metrics.sample.window.ms> =
            30000
            partitioner.class = class
            org.apache.kafka.clients.producer.internals.DefaultPartitioner
            send.buffer.bytes = 131072
            linger.ms <http://linger.ms> = 0

            2016-06-22 11:05:13,252 INFO  AppInfoParser:82 - Kafka
            version : 0.9.0.1
            2016-06-22 11:05:13,252 INFO  AppInfoParser:83 - Kafka
            commitId : 23c69d62a0cabf06
            2016-06-22 11:05:13,361 INFO  KafkaProducer:613 - Closing
            the Kafka producer with timeoutMillis = 9223372036854775807 ms.
            HERE
            2016-06-22 11:05:15,003 INFO  ProducerConfig:165 -
            ProducerConfig values:
            compression.type = none
            metric.reporters = []
            metadata.max.age.ms <http://metadata.max.age.ms> = 300000
            metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
            = 60000
            reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            bootstrap.servers = [broker1:9092]
            retry.backoff.ms <http://retry.backoff.ms> = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            buffer.memory = 33554432
            timeout.ms <http://timeout.ms> = 30000
            key.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            sasl.kerberos.service.name
            <http://sasl.kerberos.service.name> = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.keystore.type = JKS
            ssl.trustmanager.algorithm = PKIX
            block.on.buffer.full = false
            ssl.key.password = null
            max.block.ms <http://max.block.ms> = 60000
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms <http://connections.max.idle.ms> =
            540000
            ssl.truststore.password = null
            max.in.flight.requests.per.connection = 5
            metrics.num.samples = 2
            client.id <http://client.id> =
            ssl.endpoint.identification.algorithm = null
            ssl.protocol = TLS
            request.timeout.ms <http://request.timeout.ms> = 30000
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            acks = 1
            batch.size = 16384
            ssl.keystore.location = null
            receive.buffer.bytes = 32768
            ssl.cipher.suites = null
            ssl.truststore.type = JKS
            security.protocol = PLAINTEXT
            retries = 3
            max.request.size = 1048576
            value.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            ssl.truststore.location = null
            ssl.keystore.password = null
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms <http://metrics.sample.window.ms> =
            30000
            partitioner.class = class
            org.apache.kafka.clients.producer.internals.DefaultPartitioner
            send.buffer.bytes = 131072
            linger.ms <http://linger.ms> = 0

            2016-06-22 11:05:15,008 INFO  AppInfoParser:82 - Kafka
            version : 0.9.0.1
            2016-06-22 11:05:15,008 INFO  AppInfoParser:83 - Kafka
            commitId : 23c69d62a0cabf06
            2016-06-22 11:05:15,120 INFO  KafkaProducer:613 - Closing
            the Kafka producer with timeoutMillis = 9223372036854775807 ms.
            HERE
            2016-06-22 11:06:20,735 INFO  ProducerConfig:165 -
            ProducerConfig values:
            compression.type = none
            metric.reporters = []
            metadata.max.age.ms <http://metadata.max.age.ms> = 300000
            metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms>
            = 60000
            reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
            sasl.kerberos.ticket.renew.window.factor = 0.8
            bootstrap.servers = [broker1:9092]
            retry.backoff.ms <http://retry.backoff.ms> = 100
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            buffer.memory = 33554432
            timeout.ms <http://timeout.ms> = 30000
            key.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            sasl.kerberos.service.name
            <http://sasl.kerberos.service.name> = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            ssl.keystore.type = JKS
            ssl.trustmanager.algorithm = PKIX
            block.on.buffer.full = false
            ssl.key.password = null
            max.block.ms <http://max.block.ms> = 60000
            sasl.kerberos.min.time.before.relogin = 60000
            connections.max.idle.ms <http://connections.max.idle.ms> =
            540000
            ssl.truststore.password = null
            max.in.flight.requests.per.connection = 5
            metrics.num.samples = 2
            client.id <http://client.id> =
            ssl.endpoint.identification.algorithm = null
            ssl.protocol = TLS
            request.timeout.ms <http://request.timeout.ms> = 30000
            ssl.provider = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            acks = 1
            batch.size = 16384
            ssl.keystore.location = null
            receive.buffer.bytes = 32768
            ssl.cipher.suites = null
            ssl.truststore.type = JKS
            security.protocol = PLAINTEXT
            retries = 3
            max.request.size = 1048576
            value.serializer = class
            org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
            ssl.truststore.location = null
            ssl.keystore.password = null
            ssl.keymanager.algorithm = SunX509
            metrics.sample.window.ms <http://metrics.sample.window.ms> =
            30000
            partitioner.class = class
            org.apache.kafka.clients.producer.internals.DefaultPartitioner
            send.buffer.bytes = 131072
            linger.ms <http://linger.ms> = 0

            2016-06-22 11:06:20,743 INFO  AppInfoParser:82 - Kafka
            version : 0.9.0.1
            2016-06-22 11:06:20,744 INFO  AppInfoParser:83 - Kafka
            commitId : 23c69d62a0cabf06
            2016-06-22 11:06:20,849 INFO  KafkaProducer:613 - Closing
            the Kafka producer with timeoutMillis = 9223372036854775807 ms.

            Thanks,

            Jesse



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to