I didn't have SLF4J on my dependencies and it wasn't outputting the Kafka
logging.

This output repeats over and over:
[pool-2-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka
version : 0.9.0.1
[pool-2-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka
commitId : 23c69d62a0cabf06
[pool-2-thread-1] INFO org.apache.beam.sdk.io.kafka.KafkaIO - Reader-0:
resuming eventsim-0 at default offset
[pool-2-thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig -
ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = Reader-0_offset_consumer_1341721707_none
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [broker1:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 524288
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest

[pool-2-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka
version : 0.9.0.1
[pool-2-thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka
commitId : 23c69d62a0cabf06
[pool-2-thread-1] WARN org.apache.beam.sdk.io.kafka.KafkaIO - Reader-0:
getWatermark() : no records have been read yet.
[pool-77-thread-1] INFO org.apache.beam.sdk.io.kafka.KafkaIO - Reader-0:
Returning from consumer pool loop
[pool-78-thread-1] WARN org.apache.beam.sdk.io.kafka.KafkaIO - Reader-0:
exception while fetching latest offsets. ignored.
org.apache.kafka.common.errors.WakeupException
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:324)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:290)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:272)
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1299)
at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1106)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.updateLatestOffsets(KafkaIO.java:1057)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.access$1700(KafkaIO.java:752)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$3.run(KafkaIO.java:965)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The output says it's ignoring, but doesn't appear to.

On Wed, Jun 8, 2016 at 4:12 PM Raghu Angadi <[email protected]> wrote:

> There is some logging at INFO level. e.g. you should see log at line 692
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L692>
>  in
> the client side (where you launch the app from).. and line 932 or 935
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L935>
>  on
> the workers. Which runner are you using?
>
> On Wed, Jun 8, 2016 at 12:55 PM, Jesse Anderson <[email protected]>
> wrote:
>
>> Nothing gets logged out. Do I need to change a log level?
>>
>> On Wed, Jun 8, 2016, 3:54 PM Raghu Angadi <[email protected]> wrote:
>>
>>> On Wed, Jun 8, 2016 at 12:35 PM, Jesse Anderson <[email protected]>
>>> wrote:
>>>
>>>> In my code, I create a KafkaIO.read() that is processed by a
>>>> MapElements.via(). The lamdba in the via is never called. I've verified
>>>> there is data in the topic and I've manually added data using the console
>>>> producer. Still, no data.
>>>
>>>
>>> Do you have logs from KafkaIO?
>>>
>>
>

Reply via email to