Hi,
this depends on the Runner that you're using and how it subdivides the
elements into bundles. The Kafka writer opens and closes a Producer for
every bundle, so if every element arrives more or less in its own bundle
then you will get a new Producer for every element.

Cheers,
Aljoscha

On Wed, 22 Jun 2016 at 18:25 Jean-Baptiste Onofré <[email protected]> wrote:

> Hi Jesse,
>
> Gonna take a look, but the Writer could keep a KafkaProducer handler
> IMHO (it's what I'm doing in the Jms IO).
>
> I keep you posted.
>
> Regards
> JB
>
> On 06/22/2016 06:12 PM, Jesse Anderson wrote:
> > The KafkaIO KafkaProducer seems to be closing after every send. On the
> > next send, the KafkaProducer is opened again.
> >
> > Here is the log output;
> > 2016-06-22 11:04:49,356 WARN  KafkaIO:714 - Looks like generateSplits()
> > is not called. Generate single split.
> > 2016-06-22 11:04:49,399 INFO  ConsumerConfig:165 - ConsumerConfig values:
> > metric.reporters = []
> > metadata.max.age.ms <http://metadata.max.age.ms> = 300000
> > value.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > group.id <http://group.id> =
> > partition.assignment.strategy =
> > [org.apache.kafka.clients.consumer.RangeAssignor]
> > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > max.partition.fetch.bytes = 1048576
> > bootstrap.servers = [broker1:9092]
> > retry.backoff.ms <http://retry.backoff.ms> = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.keystore.type = JKS
> > ssl.trustmanager.algorithm = PKIX
> > enable.auto.commit = false
> > ssl.key.password = null
> > fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms <http://connections.max.idle.ms> = 540000
> > ssl.truststore.password = null
> > session.timeout.ms <http://session.timeout.ms> = 30000
> > metrics.num.samples = 2
> > client.id <http://client.id> =
> > ssl.endpoint.identification.algorithm = null
> > key.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > ssl.protocol = TLS
> > check.crcs = true
> > request.timeout.ms <http://request.timeout.ms> = 40000
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.keystore.location = null
> > heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
> > auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
> > receive.buffer.bytes = 524288
> > ssl.cipher.suites = null
> > ssl.truststore.type = JKS
> > security.protocol = PLAINTEXT
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000
> > fetch.min.bytes = 1
> > send.buffer.bytes = 131072
> > auto.offset.reset = earliest
> >
> > 2016-06-22 11:04:49,681 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> > 2016-06-22 11:04:49,681 INFO  AppInfoParser:83 - Kafka commitId :
> > 23c69d62a0cabf06
> > 2016-06-22 11:04:49,865 INFO  KafkaIO:692 - Partitions assigned to split
> > 0 (total 1): eventsim-0
> > 2016-06-22 11:04:49,921 INFO  ConsumerConfig:165 - ConsumerConfig values:
> > metric.reporters = []
> > metadata.max.age.ms <http://metadata.max.age.ms> = 300000
> > value.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > group.id <http://group.id> =
> > partition.assignment.strategy =
> > [org.apache.kafka.clients.consumer.RangeAssignor]
> > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > max.partition.fetch.bytes = 1048576
> > bootstrap.servers = [broker1:9092]
> > retry.backoff.ms <http://retry.backoff.ms> = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.keystore.type = JKS
> > ssl.trustmanager.algorithm = PKIX
> > enable.auto.commit = false
> > ssl.key.password = null
> > fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms <http://connections.max.idle.ms> = 540000
> > ssl.truststore.password = null
> > session.timeout.ms <http://session.timeout.ms> = 30000
> > metrics.num.samples = 2
> > client.id <http://client.id> =
> > ssl.endpoint.identification.algorithm = null
> > key.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > ssl.protocol = TLS
> > check.crcs = true
> > request.timeout.ms <http://request.timeout.ms> = 40000
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.keystore.location = null
> > heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
> > auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
> > receive.buffer.bytes = 524288
> > ssl.cipher.suites = null
> > ssl.truststore.type = JKS
> > security.protocol = PLAINTEXT
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000
> > fetch.min.bytes = 1
> > send.buffer.bytes = 131072
> > auto.offset.reset = earliest
> >
> > 2016-06-22 11:04:49,924 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> > 2016-06-22 11:04:49,924 INFO  AppInfoParser:83 - Kafka commitId :
> > 23c69d62a0cabf06
> > 2016-06-22 11:04:49,926 INFO  KafkaIO:937 - Reader-0: resuming
> > eventsim-0 at default offset
> > 2016-06-22 11:04:49,934 INFO  ConsumerConfig:165 - ConsumerConfig values:
> > metric.reporters = []
> > metadata.max.age.ms <http://metadata.max.age.ms> = 300000
> > value.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > group.id <http://group.id> = Reader-0_offset_consumer_698533492_none
> > partition.assignment.strategy =
> > [org.apache.kafka.clients.consumer.RangeAssignor]
> > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > max.partition.fetch.bytes = 1048576
> > bootstrap.servers = [broker1:9092]
> > retry.backoff.ms <http://retry.backoff.ms> = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.keystore.type = JKS
> > ssl.trustmanager.algorithm = PKIX
> > enable.auto.commit = false
> > ssl.key.password = null
> > fetch.max.wait.ms <http://fetch.max.wait.ms> = 500
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms <http://connections.max.idle.ms> = 540000
> > ssl.truststore.password = null
> > session.timeout.ms <http://session.timeout.ms> = 30000
> > metrics.num.samples = 2
> > client.id <http://client.id> =
> > ssl.endpoint.identification.algorithm = null
> > key.deserializer = class
> > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > ssl.protocol = TLS
> > check.crcs = true
> > request.timeout.ms <http://request.timeout.ms> = 40000
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.keystore.location = null
> > heartbeat.interval.ms <http://heartbeat.interval.ms> = 3000
> > auto.commit.interval.ms <http://auto.commit.interval.ms> = 5000
> > receive.buffer.bytes = 524288
> > ssl.cipher.suites = null
> > ssl.truststore.type = JKS
> > security.protocol = PLAINTEXT
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000
> > fetch.min.bytes = 1
> > send.buffer.bytes = 131072
> > auto.offset.reset = earliest
> >
> > 2016-06-22 11:04:49,941 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> > 2016-06-22 11:04:49,942 INFO  AppInfoParser:83 - Kafka commitId :
> > 23c69d62a0cabf06
> > 2016-06-22 11:04:50,819 INFO  AbstractCoordinator:529 - Marking the
> > coordinator 2147483647 dead.
> > 2016-06-22 11:04:50,884 INFO  AbstractCoordinator:529 - Marking the
> > coordinator 2147483647 dead.
> > 2016-06-22 11:04:50,931 INFO  AbstractCoordinator:529 - Marking the
> > coordinator 2147483647 dead.
> > 2016-06-22 11:04:50,992 INFO  AbstractCoordinator:529 - Marking the
> > coordinator 2147483647 dead.
> > 2016-06-22 11:04:51,061 INFO  AbstractCoordinator:529 - Marking the
> > coordinator 2147483647 dead.
> > 2016-06-22 11:04:51,212 INFO  KafkaIO:1013 - Reader-0: first record
> offset 0
> > HERE
> > HERE
> > HERE
> > 2016-06-22 11:04:51,529 INFO  ProducerConfig:165 - ProducerConfig values:
> > compression.type = none
> > metric.reporters = []
> > metadata.max.age.ms <http://metadata.max.age.ms> = 300000
> > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000
> > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > bootstrap.servers = [broker1:9092]
> > retry.backoff.ms <http://retry.backoff.ms> = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > buffer.memory = 33554432
> > timeout.ms <http://timeout.ms> = 30000
> > key.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.keystore.type = JKS
> > ssl.trustmanager.algorithm = PKIX
> > block.on.buffer.full = false
> > ssl.key.password = null
> > max.block.ms <http://max.block.ms> = 60000
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms <http://connections.max.idle.ms> = 540000
> > ssl.truststore.password = null
> > max.in.flight.requests.per.connection = 5
> > metrics.num.samples = 2
> > client.id <http://client.id> =
> > ssl.endpoint.identification.algorithm = null
> > ssl.protocol = TLS
> > request.timeout.ms <http://request.timeout.ms> = 30000
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > acks = 1
> > batch.size = 16384
> > ssl.keystore.location = null
> > receive.buffer.bytes = 32768
> > ssl.cipher.suites = null
> > ssl.truststore.type = JKS
> > security.protocol = PLAINTEXT
> > retries = 3
> > max.request.size = 1048576
> > value.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000
> > partitioner.class = class
> > org.apache.kafka.clients.producer.internals.DefaultPartitioner
> > send.buffer.bytes = 131072
> > linger.ms <http://linger.ms> = 0
> >
> > 2016-06-22 11:04:51,574 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> > 2016-06-22 11:04:51,574 INFO  AppInfoParser:83 - Kafka commitId :
> > 23c69d62a0cabf06
> > 2016-06-22 11:04:51,671 WARN  NetworkClient:582 - Error while fetching
> > metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE}
> > 2016-06-22 11:04:51,797 INFO  KafkaProducer:613 - Closing the Kafka
> > producer with timeoutMillis = 9223372036854775807 ms.
> > HERE
> > 2016-06-22 11:05:13,247 INFO  ProducerConfig:165 - ProducerConfig values:
> > compression.type = none
> > metric.reporters = []
> > metadata.max.age.ms <http://metadata.max.age.ms> = 300000
> > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000
> > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > bootstrap.servers = [broker1:9092]
> > retry.backoff.ms <http://retry.backoff.ms> = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > buffer.memory = 33554432
> > timeout.ms <http://timeout.ms> = 30000
> > key.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.keystore.type = JKS
> > ssl.trustmanager.algorithm = PKIX
> > block.on.buffer.full = false
> > ssl.key.password = null
> > max.block.ms <http://max.block.ms> = 60000
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms <http://connections.max.idle.ms> = 540000
> > ssl.truststore.password = null
> > max.in.flight.requests.per.connection = 5
> > metrics.num.samples = 2
> > client.id <http://client.id> =
> > ssl.endpoint.identification.algorithm = null
> > ssl.protocol = TLS
> > request.timeout.ms <http://request.timeout.ms> = 30000
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > acks = 1
> > batch.size = 16384
> > ssl.keystore.location = null
> > receive.buffer.bytes = 32768
> > ssl.cipher.suites = null
> > ssl.truststore.type = JKS
> > security.protocol = PLAINTEXT
> > retries = 3
> > max.request.size = 1048576
> > value.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000
> > partitioner.class = class
> > org.apache.kafka.clients.producer.internals.DefaultPartitioner
> > send.buffer.bytes = 131072
> > linger.ms <http://linger.ms> = 0
> >
> > 2016-06-22 11:05:13,252 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> > 2016-06-22 11:05:13,252 INFO  AppInfoParser:83 - Kafka commitId :
> > 23c69d62a0cabf06
> > 2016-06-22 11:05:13,361 INFO  KafkaProducer:613 - Closing the Kafka
> > producer with timeoutMillis = 9223372036854775807 ms.
> > HERE
> > 2016-06-22 11:05:15,003 INFO  ProducerConfig:165 - ProducerConfig values:
> > compression.type = none
> > metric.reporters = []
> > metadata.max.age.ms <http://metadata.max.age.ms> = 300000
> > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000
> > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > bootstrap.servers = [broker1:9092]
> > retry.backoff.ms <http://retry.backoff.ms> = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > buffer.memory = 33554432
> > timeout.ms <http://timeout.ms> = 30000
> > key.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.keystore.type = JKS
> > ssl.trustmanager.algorithm = PKIX
> > block.on.buffer.full = false
> > ssl.key.password = null
> > max.block.ms <http://max.block.ms> = 60000
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms <http://connections.max.idle.ms> = 540000
> > ssl.truststore.password = null
> > max.in.flight.requests.per.connection = 5
> > metrics.num.samples = 2
> > client.id <http://client.id> =
> > ssl.endpoint.identification.algorithm = null
> > ssl.protocol = TLS
> > request.timeout.ms <http://request.timeout.ms> = 30000
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > acks = 1
> > batch.size = 16384
> > ssl.keystore.location = null
> > receive.buffer.bytes = 32768
> > ssl.cipher.suites = null
> > ssl.truststore.type = JKS
> > security.protocol = PLAINTEXT
> > retries = 3
> > max.request.size = 1048576
> > value.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000
> > partitioner.class = class
> > org.apache.kafka.clients.producer.internals.DefaultPartitioner
> > send.buffer.bytes = 131072
> > linger.ms <http://linger.ms> = 0
> >
> > 2016-06-22 11:05:15,008 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> > 2016-06-22 11:05:15,008 INFO  AppInfoParser:83 - Kafka commitId :
> > 23c69d62a0cabf06
> > 2016-06-22 11:05:15,120 INFO  KafkaProducer:613 - Closing the Kafka
> > producer with timeoutMillis = 9223372036854775807 ms.
> > HERE
> > 2016-06-22 11:06:20,735 INFO  ProducerConfig:165 - ProducerConfig values:
> > compression.type = none
> > metric.reporters = []
> > metadata.max.age.ms <http://metadata.max.age.ms> = 300000
> > metadata.fetch.timeout.ms <http://metadata.fetch.timeout.ms> = 60000
> > reconnect.backoff.ms <http://reconnect.backoff.ms> = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > bootstrap.servers = [broker1:9092]
> > retry.backoff.ms <http://retry.backoff.ms> = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > buffer.memory = 33554432
> > timeout.ms <http://timeout.ms> = 30000
> > key.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > sasl.kerberos.service.name <http://sasl.kerberos.service.name> = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.keystore.type = JKS
> > ssl.trustmanager.algorithm = PKIX
> > block.on.buffer.full = false
> > ssl.key.password = null
> > max.block.ms <http://max.block.ms> = 60000
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms <http://connections.max.idle.ms> = 540000
> > ssl.truststore.password = null
> > max.in.flight.requests.per.connection = 5
> > metrics.num.samples = 2
> > client.id <http://client.id> =
> > ssl.endpoint.identification.algorithm = null
> > ssl.protocol = TLS
> > request.timeout.ms <http://request.timeout.ms> = 30000
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > acks = 1
> > batch.size = 16384
> > ssl.keystore.location = null
> > receive.buffer.bytes = 32768
> > ssl.cipher.suites = null
> > ssl.truststore.type = JKS
> > security.protocol = PLAINTEXT
> > retries = 3
> > max.request.size = 1048576
> > value.serializer = class
> > org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms <http://metrics.sample.window.ms> = 30000
> > partitioner.class = class
> > org.apache.kafka.clients.producer.internals.DefaultPartitioner
> > send.buffer.bytes = 131072
> > linger.ms <http://linger.ms> = 0
> >
> > 2016-06-22 11:06:20,743 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> > 2016-06-22 11:06:20,744 INFO  AppInfoParser:83 - Kafka commitId :
> > 23c69d62a0cabf06
> > 2016-06-22 11:06:20,849 INFO  KafkaProducer:613 - Closing the Kafka
> > producer with timeoutMillis = 9223372036854775807 ms.
> >
> > Thanks,
> >
> > Jesse
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to