[ https://issues.apache.org/jira/browse/SAMZA-560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317199#comment-14317199 ]
Yan Fang commented on SAMZA-560: -------------------------------- Not sure what blocks the new producer to fetch the metadata. If the producer is not created by the StreamAppender (meaning, it's created inside the Container.class), it can get the metadata. > StreamAppender not working after upgrading kafka producer API > ------------------------------------------------------------- > > Key: SAMZA-560 > URL: https://issues.apache.org/jira/browse/SAMZA-560 > Project: Samza > Issue Type: Bug > Reporter: Yan Fang > Assignee: Navina Ramesh > > After SAMZA-227, StreamAppender is not working. Digging into it a little, > still can not figure it out. > It throws exception with > {code} > Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: > Failed to update metadata after 60000 ms. > {code} > Log: > {code} > 2015-02-11 12:19:15 SamzaAppMaster$ [INFO] got container id: > container_1423528474084_0023_02_000001 > 2015-02-11 12:19:15 KafkaSystemProducer [TRACE] Enqueueing message: > log4j-log, OutgoingMessageEnvelope [systemStream=SystemStream [system=kafka2, > stream=__samza_printout_task1_1_logs], keySerializerName=null, > messageSerializerName=null, partitionKey=[B@155a6bd1, key=[B@155a6bd1, > message=[B@635c714a]. > 2015-02-11 12:19:15 KafkaSystemProducer [INFO] Creating a new producer for > system kafka2. > 2015-02-11 12:19:15 ProducerConfig [INFO] ProducerConfig values: > block.on.buffer.full = true > retry.backoff.ms = 100 > buffer.memory = 33554432 > batch.size = 16384 > metrics.sample.window.ms = 30000 > metadata.max.age.ms = 300000 > receive.buffer.bytes = 32768 > timeout.ms = 30000 > max.in.flight.requests.per.connection = 1 > metric.reporters = [] > bootstrap.servers = [localhost:9092] > client.id = samza_producer-printout_task1-1-1423685955347-0 > compression.type = none > retries = 2147483647 > max.request.size = 1048576 > send.buffer.bytes = 131072 > acks = 1 > reconnect.backoff.ms = 10 > linger.ms = 0 > metrics.num.samples = 2 > metadata.fetch.timeout.ms = 60000 > 2015-02-11 12:19:15 KafkaProducer [TRACE] Starting the Kafka producer > 2015-02-11 12:19:15 Metadata [DEBUG] Updated cluster metadata version 1 to > Cluster(nodes = [Node(localhost, 9092)], partitions = []) > 2015-02-11 12:19:15 KafkaProducer [DEBUG] Kafka producer started > 2015-02-11 12:19:15 KafkaSystemProducer [DEBUG] Created a new producer for > system kafka2. > 2015-02-11 12:19:15 KafkaProducer [TRACE] Requesting metadata update for > topic __samza_printout_task1_1_logs. > 2015-02-11 12:19:15 Sender [DEBUG] Starting Kafka producer I/O thread. > 2015-02-11 12:20:15 KafkaSystemProducer [TRACE] Enqueueing message: > log4j-log, OutgoingMessageEnvelope [systemStream=SystemStream [system=kafka2, > stream=__samza_printout_task1_1_logs], keySerializerName=null, > messageSerializerName=null, partitionKey=[B@609548c3, key=[B@609548c3, > message=[B@68dc2bbe]. > 2015-02-11 12:20:15 KafkaProducer [TRACE] Requesting metadata update for > topic __samza_printout_task1_1_logs. > {code} > Also tested configuring two kafka systems in one job, which worked. > Really can not figure it out why the KafkaProducer created through the > StreamAppender does not work. > Any ideas? -- This message was sent by Atlassian JIRA (v6.3.4#6332)