Each sink can have own producer (different sinks can have different Kafka configuration). But overall, yes. We could have a shared producer.
On Wed, Jun 22, 2016 at 1:34 PM, Jesse Anderson <[email protected]> wrote: > @Raghu is there a reason why you'd need more than one KafkaProducer per > process? Maybe the KafkaProducer could be static. > > @JB how are you handling this in the JMS IO? > > On Wed, Jun 22, 2016 at 12:36 PM Aljoscha Krettek <[email protected]> > wrote: > >> The Flink runner sits at the other end of the spectrum, in that >> everything is one big bundle. So the Kafka Sind should work well with that. >> >> On Wed, 22 Jun 2016 at 20:30 Raghu Angadi <[email protected]> wrote: >> >>> Yeah, this is pretty bad right now. There is a proposal to add API to >>> DoFn that makes it easy to state across multiple bundles. >>> >>> I think the current sink is pretty unusable in this state. We could use >>> an cache of producers (that expire if unused for a few seconds). >>> >>> On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson <[email protected]> >>> wrote: >>> >>>> The KafkaIO KafkaProducer seems to be closing after every send. On the >>>> next send, the KafkaProducer is opened again. >>>> >>>> Here is the log output; >>>> 2016-06-22 11:04:49,356 WARN KafkaIO:714 - Looks like generateSplits() >>>> is not called. Generate single split. >>>> 2016-06-22 11:04:49,399 INFO ConsumerConfig:165 - ConsumerConfig >>>> values: >>>> metric.reporters = [] >>>> metadata.max.age.ms = 300000 >>>> value.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> group.id = >>>> partition.assignment.strategy = >>>> [org.apache.kafka.clients.consumer.RangeAssignor] >>>> reconnect.backoff.ms = 50 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> max.partition.fetch.bytes = 1048576 >>>> bootstrap.servers = [broker1:9092] >>>> retry.backoff.ms = 100 >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> ssl.keystore.type = JKS >>>> ssl.trustmanager.algorithm = PKIX >>>> enable.auto.commit = false >>>> ssl.key.password = null >>>> fetch.max.wait.ms = 500 >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> connections.max.idle.ms = 540000 >>>> ssl.truststore.password = null >>>> session.timeout.ms = 30000 >>>> metrics.num.samples = 2 >>>> client.id = >>>> ssl.endpoint.identification.algorithm = null >>>> key.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> ssl.protocol = TLS >>>> check.crcs = true >>>> request.timeout.ms = 40000 >>>> ssl.provider = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> ssl.keystore.location = null >>>> heartbeat.interval.ms = 3000 >>>> auto.commit.interval.ms = 5000 >>>> receive.buffer.bytes = 524288 >>>> ssl.cipher.suites = null >>>> ssl.truststore.type = JKS >>>> security.protocol = PLAINTEXT >>>> ssl.truststore.location = null >>>> ssl.keystore.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> metrics.sample.window.ms = 30000 >>>> fetch.min.bytes = 1 >>>> send.buffer.bytes = 131072 >>>> auto.offset.reset = earliest >>>> >>>> 2016-06-22 11:04:49,681 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >>>> 2016-06-22 11:04:49,681 INFO AppInfoParser:83 - Kafka commitId : >>>> 23c69d62a0cabf06 >>>> 2016-06-22 11:04:49,865 INFO KafkaIO:692 - Partitions assigned to >>>> split 0 (total 1): eventsim-0 >>>> 2016-06-22 11:04:49,921 INFO ConsumerConfig:165 - ConsumerConfig >>>> values: >>>> metric.reporters = [] >>>> metadata.max.age.ms = 300000 >>>> value.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> group.id = >>>> partition.assignment.strategy = >>>> [org.apache.kafka.clients.consumer.RangeAssignor] >>>> reconnect.backoff.ms = 50 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> max.partition.fetch.bytes = 1048576 >>>> bootstrap.servers = [broker1:9092] >>>> retry.backoff.ms = 100 >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> ssl.keystore.type = JKS >>>> ssl.trustmanager.algorithm = PKIX >>>> enable.auto.commit = false >>>> ssl.key.password = null >>>> fetch.max.wait.ms = 500 >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> connections.max.idle.ms = 540000 >>>> ssl.truststore.password = null >>>> session.timeout.ms = 30000 >>>> metrics.num.samples = 2 >>>> client.id = >>>> ssl.endpoint.identification.algorithm = null >>>> key.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> ssl.protocol = TLS >>>> check.crcs = true >>>> request.timeout.ms = 40000 >>>> ssl.provider = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> ssl.keystore.location = null >>>> heartbeat.interval.ms = 3000 >>>> auto.commit.interval.ms = 5000 >>>> receive.buffer.bytes = 524288 >>>> ssl.cipher.suites = null >>>> ssl.truststore.type = JKS >>>> security.protocol = PLAINTEXT >>>> ssl.truststore.location = null >>>> ssl.keystore.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> metrics.sample.window.ms = 30000 >>>> fetch.min.bytes = 1 >>>> send.buffer.bytes = 131072 >>>> auto.offset.reset = earliest >>>> >>>> 2016-06-22 11:04:49,924 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >>>> 2016-06-22 11:04:49,924 INFO AppInfoParser:83 - Kafka commitId : >>>> 23c69d62a0cabf06 >>>> 2016-06-22 11:04:49,926 INFO KafkaIO:937 - Reader-0: resuming >>>> eventsim-0 at default offset >>>> 2016-06-22 11:04:49,934 INFO ConsumerConfig:165 - ConsumerConfig >>>> values: >>>> metric.reporters = [] >>>> metadata.max.age.ms = 300000 >>>> value.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> group.id = Reader-0_offset_consumer_698533492_none >>>> partition.assignment.strategy = >>>> [org.apache.kafka.clients.consumer.RangeAssignor] >>>> reconnect.backoff.ms = 50 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> max.partition.fetch.bytes = 1048576 >>>> bootstrap.servers = [broker1:9092] >>>> retry.backoff.ms = 100 >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> ssl.keystore.type = JKS >>>> ssl.trustmanager.algorithm = PKIX >>>> enable.auto.commit = false >>>> ssl.key.password = null >>>> fetch.max.wait.ms = 500 >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> connections.max.idle.ms = 540000 >>>> ssl.truststore.password = null >>>> session.timeout.ms = 30000 >>>> metrics.num.samples = 2 >>>> client.id = >>>> ssl.endpoint.identification.algorithm = null >>>> key.deserializer = class >>>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>>> ssl.protocol = TLS >>>> check.crcs = true >>>> request.timeout.ms = 40000 >>>> ssl.provider = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> ssl.keystore.location = null >>>> heartbeat.interval.ms = 3000 >>>> auto.commit.interval.ms = 5000 >>>> receive.buffer.bytes = 524288 >>>> ssl.cipher.suites = null >>>> ssl.truststore.type = JKS >>>> security.protocol = PLAINTEXT >>>> ssl.truststore.location = null >>>> ssl.keystore.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> metrics.sample.window.ms = 30000 >>>> fetch.min.bytes = 1 >>>> send.buffer.bytes = 131072 >>>> auto.offset.reset = earliest >>>> >>>> 2016-06-22 11:04:49,941 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >>>> 2016-06-22 11:04:49,942 INFO AppInfoParser:83 - Kafka commitId : >>>> 23c69d62a0cabf06 >>>> 2016-06-22 11:04:50,819 INFO AbstractCoordinator:529 - Marking the >>>> coordinator 2147483647 dead. >>>> 2016-06-22 11:04:50,884 INFO AbstractCoordinator:529 - Marking the >>>> coordinator 2147483647 dead. >>>> 2016-06-22 11:04:50,931 INFO AbstractCoordinator:529 - Marking the >>>> coordinator 2147483647 dead. >>>> 2016-06-22 11:04:50,992 INFO AbstractCoordinator:529 - Marking the >>>> coordinator 2147483647 dead. >>>> 2016-06-22 11:04:51,061 INFO AbstractCoordinator:529 - Marking the >>>> coordinator 2147483647 dead. >>>> 2016-06-22 11:04:51,212 INFO KafkaIO:1013 - Reader-0: first record >>>> offset 0 >>>> HERE >>>> HERE >>>> HERE >>>> 2016-06-22 11:04:51,529 INFO ProducerConfig:165 - ProducerConfig >>>> values: >>>> compression.type = none >>>> metric.reporters = [] >>>> metadata.max.age.ms = 300000 >>>> metadata.fetch.timeout.ms = 60000 >>>> reconnect.backoff.ms = 50 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> bootstrap.servers = [broker1:9092] >>>> retry.backoff.ms = 100 >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> buffer.memory = 33554432 >>>> timeout.ms = 30000 >>>> key.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> ssl.keystore.type = JKS >>>> ssl.trustmanager.algorithm = PKIX >>>> block.on.buffer.full = false >>>> ssl.key.password = null >>>> max.block.ms = 60000 >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> connections.max.idle.ms = 540000 >>>> ssl.truststore.password = null >>>> max.in.flight.requests.per.connection = 5 >>>> metrics.num.samples = 2 >>>> client.id = >>>> ssl.endpoint.identification.algorithm = null >>>> ssl.protocol = TLS >>>> request.timeout.ms = 30000 >>>> ssl.provider = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> acks = 1 >>>> batch.size = 16384 >>>> ssl.keystore.location = null >>>> receive.buffer.bytes = 32768 >>>> ssl.cipher.suites = null >>>> ssl.truststore.type = JKS >>>> security.protocol = PLAINTEXT >>>> retries = 3 >>>> max.request.size = 1048576 >>>> value.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> ssl.truststore.location = null >>>> ssl.keystore.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> metrics.sample.window.ms = 30000 >>>> partitioner.class = class >>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner >>>> send.buffer.bytes = 131072 >>>> linger.ms = 0 >>>> >>>> 2016-06-22 11:04:51,574 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >>>> 2016-06-22 11:04:51,574 INFO AppInfoParser:83 - Kafka commitId : >>>> 23c69d62a0cabf06 >>>> 2016-06-22 11:04:51,671 WARN NetworkClient:582 - Error while fetching >>>> metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE} >>>> 2016-06-22 11:04:51,797 INFO KafkaProducer:613 - Closing the Kafka >>>> producer with timeoutMillis = 9223372036854775807 ms. >>>> HERE >>>> 2016-06-22 11:05:13,247 INFO ProducerConfig:165 - ProducerConfig >>>> values: >>>> compression.type = none >>>> metric.reporters = [] >>>> metadata.max.age.ms = 300000 >>>> metadata.fetch.timeout.ms = 60000 >>>> reconnect.backoff.ms = 50 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> bootstrap.servers = [broker1:9092] >>>> retry.backoff.ms = 100 >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> buffer.memory = 33554432 >>>> timeout.ms = 30000 >>>> key.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> ssl.keystore.type = JKS >>>> ssl.trustmanager.algorithm = PKIX >>>> block.on.buffer.full = false >>>> ssl.key.password = null >>>> max.block.ms = 60000 >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> connections.max.idle.ms = 540000 >>>> ssl.truststore.password = null >>>> max.in.flight.requests.per.connection = 5 >>>> metrics.num.samples = 2 >>>> client.id = >>>> ssl.endpoint.identification.algorithm = null >>>> ssl.protocol = TLS >>>> request.timeout.ms = 30000 >>>> ssl.provider = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> acks = 1 >>>> batch.size = 16384 >>>> ssl.keystore.location = null >>>> receive.buffer.bytes = 32768 >>>> ssl.cipher.suites = null >>>> ssl.truststore.type = JKS >>>> security.protocol = PLAINTEXT >>>> retries = 3 >>>> max.request.size = 1048576 >>>> value.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> ssl.truststore.location = null >>>> ssl.keystore.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> metrics.sample.window.ms = 30000 >>>> partitioner.class = class >>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner >>>> send.buffer.bytes = 131072 >>>> linger.ms = 0 >>>> >>>> 2016-06-22 11:05:13,252 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >>>> 2016-06-22 11:05:13,252 INFO AppInfoParser:83 - Kafka commitId : >>>> 23c69d62a0cabf06 >>>> 2016-06-22 11:05:13,361 INFO KafkaProducer:613 - Closing the Kafka >>>> producer with timeoutMillis = 9223372036854775807 ms. >>>> HERE >>>> 2016-06-22 11:05:15,003 INFO ProducerConfig:165 - ProducerConfig >>>> values: >>>> compression.type = none >>>> metric.reporters = [] >>>> metadata.max.age.ms = 300000 >>>> metadata.fetch.timeout.ms = 60000 >>>> reconnect.backoff.ms = 50 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> bootstrap.servers = [broker1:9092] >>>> retry.backoff.ms = 100 >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> buffer.memory = 33554432 >>>> timeout.ms = 30000 >>>> key.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> ssl.keystore.type = JKS >>>> ssl.trustmanager.algorithm = PKIX >>>> block.on.buffer.full = false >>>> ssl.key.password = null >>>> max.block.ms = 60000 >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> connections.max.idle.ms = 540000 >>>> ssl.truststore.password = null >>>> max.in.flight.requests.per.connection = 5 >>>> metrics.num.samples = 2 >>>> client.id = >>>> ssl.endpoint.identification.algorithm = null >>>> ssl.protocol = TLS >>>> request.timeout.ms = 30000 >>>> ssl.provider = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> acks = 1 >>>> batch.size = 16384 >>>> ssl.keystore.location = null >>>> receive.buffer.bytes = 32768 >>>> ssl.cipher.suites = null >>>> ssl.truststore.type = JKS >>>> security.protocol = PLAINTEXT >>>> retries = 3 >>>> max.request.size = 1048576 >>>> value.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> ssl.truststore.location = null >>>> ssl.keystore.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> metrics.sample.window.ms = 30000 >>>> partitioner.class = class >>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner >>>> send.buffer.bytes = 131072 >>>> linger.ms = 0 >>>> >>>> 2016-06-22 11:05:15,008 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >>>> 2016-06-22 11:05:15,008 INFO AppInfoParser:83 - Kafka commitId : >>>> 23c69d62a0cabf06 >>>> 2016-06-22 11:05:15,120 INFO KafkaProducer:613 - Closing the Kafka >>>> producer with timeoutMillis = 9223372036854775807 ms. >>>> HERE >>>> 2016-06-22 11:06:20,735 INFO ProducerConfig:165 - ProducerConfig >>>> values: >>>> compression.type = none >>>> metric.reporters = [] >>>> metadata.max.age.ms = 300000 >>>> metadata.fetch.timeout.ms = 60000 >>>> reconnect.backoff.ms = 50 >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> bootstrap.servers = [broker1:9092] >>>> retry.backoff.ms = 100 >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> buffer.memory = 33554432 >>>> timeout.ms = 30000 >>>> key.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> sasl.kerberos.service.name = null >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> ssl.keystore.type = JKS >>>> ssl.trustmanager.algorithm = PKIX >>>> block.on.buffer.full = false >>>> ssl.key.password = null >>>> max.block.ms = 60000 >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> connections.max.idle.ms = 540000 >>>> ssl.truststore.password = null >>>> max.in.flight.requests.per.connection = 5 >>>> metrics.num.samples = 2 >>>> client.id = >>>> ssl.endpoint.identification.algorithm = null >>>> ssl.protocol = TLS >>>> request.timeout.ms = 30000 >>>> ssl.provider = null >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> acks = 1 >>>> batch.size = 16384 >>>> ssl.keystore.location = null >>>> receive.buffer.bytes = 32768 >>>> ssl.cipher.suites = null >>>> ssl.truststore.type = JKS >>>> security.protocol = PLAINTEXT >>>> retries = 3 >>>> max.request.size = 1048576 >>>> value.serializer = class >>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer >>>> ssl.truststore.location = null >>>> ssl.keystore.password = null >>>> ssl.keymanager.algorithm = SunX509 >>>> metrics.sample.window.ms = 30000 >>>> partitioner.class = class >>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner >>>> send.buffer.bytes = 131072 >>>> linger.ms = 0 >>>> >>>> 2016-06-22 11:06:20,743 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 >>>> 2016-06-22 11:06:20,744 INFO AppInfoParser:83 - Kafka commitId : >>>> 23c69d62a0cabf06 >>>> 2016-06-22 11:06:20,849 INFO KafkaProducer:613 - Closing the Kafka >>>> producer with timeoutMillis = 9223372036854775807 ms. >>>> >>>> Thanks, >>>> >>>> Jesse >>>> >>> >>>
