Yeah, this is pretty bad right now. There is a proposal to add API to DoFn
that makes it easy to state across multiple bundles.

I think the current sink is pretty unusable in this state. We could use an
cache of producers (that expire if unused for a few seconds).

On Wed, Jun 22, 2016 at 9:12 AM, Jesse Anderson <[email protected]>
wrote:

> The KafkaIO KafkaProducer seems to be closing after every send. On the
> next send, the KafkaProducer is opened again.
>
> Here is the log output;
> 2016-06-22 11:04:49,356 WARN  KafkaIO:714 - Looks like generateSplits() is
> not called. Generate single split.
> 2016-06-22 11:04:49,399 INFO  ConsumerConfig:165 - ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id =
> partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [broker1:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = null
> session.timeout.ms = 30000
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 40000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 524288
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
>
> 2016-06-22 11:04:49,681 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> 2016-06-22 11:04:49,681 INFO  AppInfoParser:83 - Kafka commitId :
> 23c69d62a0cabf06
> 2016-06-22 11:04:49,865 INFO  KafkaIO:692 - Partitions assigned to split 0
> (total 1): eventsim-0
> 2016-06-22 11:04:49,921 INFO  ConsumerConfig:165 - ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id =
> partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [broker1:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = null
> session.timeout.ms = 30000
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 40000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 524288
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
>
> 2016-06-22 11:04:49,924 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> 2016-06-22 11:04:49,924 INFO  AppInfoParser:83 - Kafka commitId :
> 23c69d62a0cabf06
> 2016-06-22 11:04:49,926 INFO  KafkaIO:937 - Reader-0: resuming eventsim-0
> at default offset
> 2016-06-22 11:04:49,934 INFO  ConsumerConfig:165 - ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> group.id = Reader-0_offset_consumer_698533492_none
> partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [broker1:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = null
> session.timeout.ms = 30000
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 40000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 524288
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
>
> 2016-06-22 11:04:49,941 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> 2016-06-22 11:04:49,942 INFO  AppInfoParser:83 - Kafka commitId :
> 23c69d62a0cabf06
> 2016-06-22 11:04:50,819 INFO  AbstractCoordinator:529 - Marking the
> coordinator 2147483647 dead.
> 2016-06-22 11:04:50,884 INFO  AbstractCoordinator:529 - Marking the
> coordinator 2147483647 dead.
> 2016-06-22 11:04:50,931 INFO  AbstractCoordinator:529 - Marking the
> coordinator 2147483647 dead.
> 2016-06-22 11:04:50,992 INFO  AbstractCoordinator:529 - Marking the
> coordinator 2147483647 dead.
> 2016-06-22 11:04:51,061 INFO  AbstractCoordinator:529 - Marking the
> coordinator 2147483647 dead.
> 2016-06-22 11:04:51,212 INFO  KafkaIO:1013 - Reader-0: first record offset
> 0
> HERE
> HERE
> HERE
> 2016-06-22 11:04:51,529 INFO  ProducerConfig:165 - ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 300000
> metadata.fetch.timeout.ms = 60000
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [broker1:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> max.block.ms = 60000
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = null
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> ssl.protocol = TLS
> request.timeout.ms = 30000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> acks = 1
> batch.size = 16384
> ssl.keystore.location = null
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> retries = 3
> max.request.size = 1048576
> value.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> send.buffer.bytes = 131072
> linger.ms = 0
>
> 2016-06-22 11:04:51,574 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> 2016-06-22 11:04:51,574 INFO  AppInfoParser:83 - Kafka commitId :
> 23c69d62a0cabf06
> 2016-06-22 11:04:51,671 WARN  NetworkClient:582 - Error while fetching
> metadata with correlation id 0 : {eventsimoutput=LEADER_NOT_AVAILABLE}
> 2016-06-22 11:04:51,797 INFO  KafkaProducer:613 - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> HERE
> 2016-06-22 11:05:13,247 INFO  ProducerConfig:165 - ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 300000
> metadata.fetch.timeout.ms = 60000
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [broker1:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> max.block.ms = 60000
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = null
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> ssl.protocol = TLS
> request.timeout.ms = 30000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> acks = 1
> batch.size = 16384
> ssl.keystore.location = null
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> retries = 3
> max.request.size = 1048576
> value.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> send.buffer.bytes = 131072
> linger.ms = 0
>
> 2016-06-22 11:05:13,252 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> 2016-06-22 11:05:13,252 INFO  AppInfoParser:83 - Kafka commitId :
> 23c69d62a0cabf06
> 2016-06-22 11:05:13,361 INFO  KafkaProducer:613 - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> HERE
> 2016-06-22 11:05:15,003 INFO  ProducerConfig:165 - ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 300000
> metadata.fetch.timeout.ms = 60000
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [broker1:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> max.block.ms = 60000
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = null
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> ssl.protocol = TLS
> request.timeout.ms = 30000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> acks = 1
> batch.size = 16384
> ssl.keystore.location = null
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> retries = 3
> max.request.size = 1048576
> value.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> send.buffer.bytes = 131072
> linger.ms = 0
>
> 2016-06-22 11:05:15,008 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> 2016-06-22 11:05:15,008 INFO  AppInfoParser:83 - Kafka commitId :
> 23c69d62a0cabf06
> 2016-06-22 11:05:15,120 INFO  KafkaProducer:613 - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> HERE
> 2016-06-22 11:06:20,735 INFO  ProducerConfig:165 - ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 300000
> metadata.fetch.timeout.ms = 60000
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [broker1:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> max.block.ms = 60000
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = null
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> ssl.protocol = TLS
> request.timeout.ms = 30000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> acks = 1
> batch.size = 16384
> ssl.keystore.location = null
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> retries = 3
> max.request.size = 1048576
> value.serializer = class
> org.apache.beam.sdk.io.kafka.KafkaIO$CoderBasedKafkaSerializer
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> send.buffer.bytes = 131072
> linger.ms = 0
>
> 2016-06-22 11:06:20,743 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1
> 2016-06-22 11:06:20,744 INFO  AppInfoParser:83 - Kafka commitId :
> 23c69d62a0cabf06
> 2016-06-22 11:06:20,849 INFO  KafkaProducer:613 - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
>
> Thanks,
>
> Jesse
>

Reply via email to