Each sink can have own producer (different sinks can have different Kafka
configuration).
But overall, yes. We could have a shared producer.

On Wed, Jun 22, 2016 at 1:34 PM, Jesse Anderson <[email protected]>
wrote:

> @Raghu is there a reason why you'd need more than one KafkaProducer per
> process? Maybe the KafkaProducer could be static.
>
> @JB how are you handling this in the JMS IO?
>
> On Wed, Jun 22, 2016 at 12:36 PM Aljoscha Krettek <[email protected]>
> wrote:
>
>> The Flink runner sits at the other end of the spectrum, in that
>> everything is one big bundle. So the Kafka Sind should work well with that.
>>
>> On Wed, 22 Jun 2016 at 20:30 Raghu Angadi <[email protected]> wrote:
>>
>>> Yeah, this is pretty bad right now. There is a proposal to add API to
>>> DoFn that makes it easy to state across multiple bundles.
>>>
>>> I think the current sink is pretty unusable in this state. We could use
>>> an cache of producers (that expire if unused for a few seconds).
>>>
>>> On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson <[email protected]>
>>> wrote:
>>>
>>>> The KafkaIO KafkaProducer seems to be closing after every send. On the
>>>> next send, the KafkaProducer is opened again.
>>>>
>>>> Here is the log output;
>>>> 2016-06-22 11:04:49,356 WARN  KafkaIO:714 - Looks like generateSplits()
>>>> is not called. Generate single split.
>>>> 2016-06-22 11:04:49,399 INFO  ConsumerConfig:165 - ConsumerConfig
>>>> values:
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> value.deserializer = class
>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>> group.id =
>>>> partition.assignment.strategy =
>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> max.partition.fetch.bytes = 1048576
>>>> bootstrap.servers = [broker1:9092]
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.keystore.type = JKS
>>>> ssl.trustmanager.algorithm = PKIX
>>>> enable.auto.commit = false
>>>> ssl.key.password = null
>>>> fetch.max.wait.ms = 500
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> ssl.truststore.password = null
>>>> session.timeout.ms = 30000
>>>> metrics.num.samples = 2
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> key.deserializer = class
>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>> ssl.protocol = TLS
>>>> check.crcs = true
>>>> request.timeout.ms = 40000
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> ssl.keystore.location = null
>>>> heartbeat.interval.ms = 3000
>>>> auto.commit.interval.ms = 5000
>>>> receive.buffer.bytes = 524288
>>>> ssl.cipher.suites = null
>>>> ssl.truststore.type = JKS
>>>> security.protocol = PLAINTEXT
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> fetch.min.bytes = 1
>>>> send.buffer.bytes = 131072
>>>> auto.offset.reset = earliest
>>>>
>>>> 2016-06-22 11:04:49,681 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>>>> 2016-06-22 11:04:49,681 INFO  AppInfoParser:83 - Kafka commitId :
>>>> 23c69d62a0cabf06
>>>> 2016-06-22 11:04:49,865 INFO  KafkaIO:692 - Partitions assigned to
>>>> split 0 (total 1): eventsim-0
>>>> 2016-06-22 11:04:49,921 INFO  ConsumerConfig:165 - ConsumerConfig
>>>> values:
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> value.deserializer = class
>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>> group.id =
>>>> partition.assignment.strategy =
>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> max.partition.fetch.bytes = 1048576
>>>> bootstrap.servers = [broker1:9092]
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.keystore.type = JKS
>>>> ssl.trustmanager.algorithm = PKIX
>>>> enable.auto.commit = false
>>>> ssl.key.password = null
>>>> fetch.max.wait.ms = 500
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> ssl.truststore.password = null
>>>> session.timeout.ms = 30000
>>>> metrics.num.samples = 2
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> key.deserializer = class
>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>> ssl.protocol = TLS
>>>> check.crcs = true
>>>> request.timeout.ms = 40000
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> ssl.keystore.location = null
>>>> heartbeat.interval.ms = 3000
>>>> auto.commit.interval.ms = 5000
>>>> receive.buffer.bytes = 524288
>>>> ssl.cipher.suites = null
>>>> ssl.truststore.type = JKS
>>>> security.protocol = PLAINTEXT
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> fetch.min.bytes = 1
>>>> send.buffer.bytes = 131072
>>>> auto.offset.reset = earliest
>>>>
>>>> 2016-06-22 11:04:49,924 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>>>> 2016-06-22 11:04:49,924 INFO  AppInfoParser:83 - Kafka commitId :
>>>> 23c69d62a0cabf06
>>>> 2016-06-22 11:04:49,926 INFO  KafkaIO:937 - Reader-0: resuming
>>>> eventsim-0 at default offset
>>>> 2016-06-22 11:04:49,934 INFO  ConsumerConfig:165 - ConsumerConfig
>>>> values:
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> value.deserializer = class
>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>> group.id = Reader-0_offset_consumer_698533492_none
>>>> partition.assignment.strategy =
>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> max.partition.fetch.bytes = 1048576
>>>> bootstrap.servers = [broker1:9092]
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.keystore.type = JKS
>>>> ssl.trustmanager.algorithm = PKIX
>>>> enable.auto.commit = false
>>>> ssl.key.password = null
>>>> fetch.max.wait.ms = 500
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> ssl.truststore.password = null
>>>> session.timeout.ms = 30000
>>>> metrics.num.samples = 2
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> key.deserializer = class
>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>> ssl.protocol = TLS
>>>> check.crcs = true
>>>> request.timeout.ms = 40000
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> ssl.keystore.location = null
>>>> heartbeat.interval.ms = 3000
>>>> auto.commit.interval.ms = 5000
>>>> receive.buffer.bytes = 524288
>>>> ssl.cipher.suites = null
>>>> ssl.truststore.type = JKS
>>>> security.protocol = PLAINTEXT
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> fetch.min.bytes = 1
>>>> send.buffer.bytes = 131072
>>>> auto.offset.reset = earliest
>>>>
>>>> 2016-06-22 11:04:49,941 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>>>> 2016-06-22 11:04:49,942 INFO  AppInfoParser:83 - Kafka commitId :
>>>> 23c69d62a0cabf06
>>>> 2016-06-22 11:04:50,819 INFO  AbstractCoordinator:529 - Marking the
>>>> coordinator 2147483647 dead.
>>>> 2016-06-22 11:04:50,884 INFO  AbstractCoordinator:529 - Marking the
>>>> coordinator 2147483647 dead.
>>>> 2016-06-22 11:04:50,931 INFO  AbstractCoordinator:529 - Marking the
>>>> coordinator 2147483647 dead.
>>>> 2016-06-22 11:04:50,992 INFO  AbstractCoordinator:529 - Marking the
>>>> coordinator 2147483647 dead.
>>>> 2016-06-22 11:04:51,061 INFO  AbstractCoordinator:529 - Marking the
>>>> coordinator 2147483647 dead.
>>>> 2016-06-22 11:04:51,212 INFO  KafkaIO:1013 - Reader-0: first record
>>>> offset 0
>>>> HERE
>>>> HERE
>>>> HERE
>>>> 2016-06-22 11:04:51,529 INFO  ProducerConfig:165 - ProducerConfig
>>>> values:
>>>> compression.type = none
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> metadata.fetch.timeout.ms = 60000
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> bootstrap.servers = [broker1:9092]
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> buffer.memory = 33554432
>>>> timeout.ms = 30000
>>>> key.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.keystore.type = JKS
>>>> ssl.trustmanager.algorithm = PKIX
>>>> block.on.buffer.full = false
>>>> ssl.key.password = null
>>>> max.block.ms = 60000
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> ssl.truststore.password = null
>>>> max.in.flight.requests.per.connection = 5
>>>> metrics.num.samples = 2
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> ssl.protocol = TLS
>>>> request.timeout.ms = 30000
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> acks = 1
>>>> batch.size = 16384
>>>> ssl.keystore.location = null
>>>> receive.buffer.bytes = 32768
>>>> ssl.cipher.suites = null
>>>> ssl.truststore.type = JKS
>>>> security.protocol = PLAINTEXT
>>>> retries = 3
>>>> max.request.size = 1048576
>>>> value.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> partitioner.class = class
>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>>>> send.buffer.bytes = 131072
>>>> linger.ms = 0
>>>>
>>>> 2016-06-22 11:04:51,574 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>>>> 2016-06-22 11:04:51,574 INFO  AppInfoParser:83 - Kafka commitId :
>>>> 23c69d62a0cabf06
>>>> 2016-06-22 11:04:51,671 WARN  NetworkClient:582 - Error while fetching
>>>> metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE}
>>>> 2016-06-22 11:04:51,797 INFO  KafkaProducer:613 - Closing the Kafka
>>>> producer with timeoutMillis = 9223372036854775807 ms.
>>>> HERE
>>>> 2016-06-22 11:05:13,247 INFO  ProducerConfig:165 - ProducerConfig
>>>> values:
>>>> compression.type = none
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> metadata.fetch.timeout.ms = 60000
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> bootstrap.servers = [broker1:9092]
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> buffer.memory = 33554432
>>>> timeout.ms = 30000
>>>> key.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.keystore.type = JKS
>>>> ssl.trustmanager.algorithm = PKIX
>>>> block.on.buffer.full = false
>>>> ssl.key.password = null
>>>> max.block.ms = 60000
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> ssl.truststore.password = null
>>>> max.in.flight.requests.per.connection = 5
>>>> metrics.num.samples = 2
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> ssl.protocol = TLS
>>>> request.timeout.ms = 30000
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> acks = 1
>>>> batch.size = 16384
>>>> ssl.keystore.location = null
>>>> receive.buffer.bytes = 32768
>>>> ssl.cipher.suites = null
>>>> ssl.truststore.type = JKS
>>>> security.protocol = PLAINTEXT
>>>> retries = 3
>>>> max.request.size = 1048576
>>>> value.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> partitioner.class = class
>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>>>> send.buffer.bytes = 131072
>>>> linger.ms = 0
>>>>
>>>> 2016-06-22 11:05:13,252 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>>>> 2016-06-22 11:05:13,252 INFO  AppInfoParser:83 - Kafka commitId :
>>>> 23c69d62a0cabf06
>>>> 2016-06-22 11:05:13,361 INFO  KafkaProducer:613 - Closing the Kafka
>>>> producer with timeoutMillis = 9223372036854775807 ms.
>>>> HERE
>>>> 2016-06-22 11:05:15,003 INFO  ProducerConfig:165 - ProducerConfig
>>>> values:
>>>> compression.type = none
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> metadata.fetch.timeout.ms = 60000
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> bootstrap.servers = [broker1:9092]
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> buffer.memory = 33554432
>>>> timeout.ms = 30000
>>>> key.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.keystore.type = JKS
>>>> ssl.trustmanager.algorithm = PKIX
>>>> block.on.buffer.full = false
>>>> ssl.key.password = null
>>>> max.block.ms = 60000
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> ssl.truststore.password = null
>>>> max.in.flight.requests.per.connection = 5
>>>> metrics.num.samples = 2
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> ssl.protocol = TLS
>>>> request.timeout.ms = 30000
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> acks = 1
>>>> batch.size = 16384
>>>> ssl.keystore.location = null
>>>> receive.buffer.bytes = 32768
>>>> ssl.cipher.suites = null
>>>> ssl.truststore.type = JKS
>>>> security.protocol = PLAINTEXT
>>>> retries = 3
>>>> max.request.size = 1048576
>>>> value.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> partitioner.class = class
>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>>>> send.buffer.bytes = 131072
>>>> linger.ms = 0
>>>>
>>>> 2016-06-22 11:05:15,008 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>>>> 2016-06-22 11:05:15,008 INFO  AppInfoParser:83 - Kafka commitId :
>>>> 23c69d62a0cabf06
>>>> 2016-06-22 11:05:15,120 INFO  KafkaProducer:613 - Closing the Kafka
>>>> producer with timeoutMillis = 9223372036854775807 ms.
>>>> HERE
>>>> 2016-06-22 11:06:20,735 INFO  ProducerConfig:165 - ProducerConfig
>>>> values:
>>>> compression.type = none
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> metadata.fetch.timeout.ms = 60000
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> bootstrap.servers = [broker1:9092]
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> buffer.memory = 33554432
>>>> timeout.ms = 30000
>>>> key.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.keystore.type = JKS
>>>> ssl.trustmanager.algorithm = PKIX
>>>> block.on.buffer.full = false
>>>> ssl.key.password = null
>>>> max.block.ms = 60000
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> ssl.truststore.password = null
>>>> max.in.flight.requests.per.connection = 5
>>>> metrics.num.samples = 2
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> ssl.protocol = TLS
>>>> request.timeout.ms = 30000
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> acks = 1
>>>> batch.size = 16384
>>>> ssl.keystore.location = null
>>>> receive.buffer.bytes = 32768
>>>> ssl.cipher.suites = null
>>>> ssl.truststore.type = JKS
>>>> security.protocol = PLAINTEXT
>>>> retries = 3
>>>> max.request.size = 1048576
>>>> value.serializer = class
>>>> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> partitioner.class = class
>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>>>> send.buffer.bytes = 131072
>>>> linger.ms = 0
>>>>
>>>> 2016-06-22 11:06:20,743 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
>>>> 2016-06-22 11:06:20,744 INFO  AppInfoParser:83 - Kafka commitId :
>>>> 23c69d62a0cabf06
>>>> 2016-06-22 11:06:20,849 INFO  KafkaProducer:613 - Closing the Kafka
>>>> producer with timeoutMillis = 9223372036854775807 ms.
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>>>>
>>>
>>>

Reply via email to