[ 
https://issues.apache.org/jira/browse/SAMZA-1371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502123#comment-16502123
 ] 

Zihao Zhang commented on SAMZA-1371:
------------------------------------

Hi [~nickpan47],
 We run into the same problem with the new version of Samza, which is 0.14.1. 
Containers get stuck at the same offset for given partitions on specific kafka 
brokers.(Partitions for testing topics have not been re-assigned since my last 
test.) 
 According to my local test, with bumping up the kafka clients dependency to 
0.11, there's a big chance that stuck container issue should be solved. After a 
rough investigation, I suspect one class used in package 
org.apache.samza.system.kafka, called *DefaultFetchSimpleConsumer.scala* causes 
this problem. This class extends *SimpleConsumer*(package kafka.consumer), 
which has been deprecated since kafka version 0.11.0.0, and will be removed in 
a future release according to Kafka's comments. And they suggest using 
*org.apache.kafka.clients.consumer.KafkaConsumer* instead. During my local 
test, I used *KafkaConsumer* (version 0.11.0.2) to fetch message data and the 
stuck offset for specific partitions will be skipped instead of hanging forever.

So, I suspect this is the reason why the new version of Samza(0.14.1) with 
kafka-clients dependency as 0.11.0.2  does not solve the stuck container issue. 

Do you have any other thoughts on this bug? Will Samza team have any plan to 
replace the usage of deprecated *SimpleConsumer* class ? 

Thanks.

> Some Samza Containers get stuck at "Starting BrokerProxy for 
> hostname:portnum" while others seem to be fine
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1371
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1371
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.11.0, 0.12.0
>         Environment: Samza version: 0.11, 0.12
> Kafka version: 0.11.0.0
>            Reporter: Ak Ka
>            Priority: Blocker
>         Attachments: stdout.log, thread_dump.txt
>
>
> We have multiple Samza apps using local store that have this issue. Some 
> containers get stuck on "Starting BrokerProxy for hostname:portnum" while 
> others seem to work as expected.  
> Here is the log:
> stuck:
> ```
> [...]
> 2017-07-25 17:11:26.546 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.547 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Validating offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]
> 2017-07-25 17:11:26.648 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Able to successfully read from offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,2]. 
> Using it to instantiate consumer.
> 2017-07-25 17:11:26.649 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Starting BrokerProxy for hostname:portnum
> // it's dead, Jim
> ```
> healthy:
> ```
> [...]
> 2017-07-25 17:11:26.920 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Creating new SimpleConsumer for host hostname:portnum for system kafka
> 2017-07-25 17:11:26.921 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Validating offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.GetOffset [INFO] 
> Able to successfully read from offset 0 for topic and partition 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_matcherValidation,1]. 
> Using it to instantiate consumer.
> 2017-07-25 17:11:27.023 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Starting BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.194 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.239 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.DefaultFetchSimpleConsumer [INFO] Reconnect due 
> to socket error: java.nio.channels.ClosedChannelException
> 2017-07-25 17:11:29.244 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [WARN] Restarting consumer due to 
> java.nio.channels.ClosedChannelException. Releasing ownership of all 
> partitions, and restarting consumer. Turn on debugging to get a full stack 
> trace.
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Abdicating for 
> [prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1]
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.KafkaSystemConsumer [INFO] Refreshing brokers 
> for: 
> Map([prod.localStateChangeLog.prod.AlertsOrganizerInstant_alertSetting,1] -> 
> 13572)
> 2017-07-25 17:11:29.247 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.247 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.248 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.265 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.265 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.523 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.524 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.601 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] Shutting down BrokerProxy for hostname:portnum
> 2017-07-25 17:11:29.602 [main] org.apache.samza.system.kafka.BrokerProxy 
> [INFO] closing simple consumer...
> 2017-07-25 17:11:29.663 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at 
> hostname:portnum for client samza_consumer-prod_AlertsOrganizerInstant-1] 
> org.apache.samza.system.kafka.BrokerProxy [INFO] Shutting down due to 
> interrupt.
> 2017-07-25 17:11:29.668 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Starting host statistics monitor
> 2017-07-25 17:11:29.670 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Registering task instances with producers.
> 2017-07-25 17:11:29.674 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Starting producer multiplexer.
> 2017-07-25 17:11:29.675 [main] org.apache.samza.container.SamzaContainer 
> [INFO] Initializing stream tasks.
> 2017-07-25 17:11:29.676 [main] 
> com.company.samza.app.companyStreamingAppWrapper [INFO] Initializing instance 
> of streaming application
> 2017-07-25 17:11:29.681 [main] 
> com.company.samza.app.companyStreamingAppWrapper [INFO] First initialization. 
> Setting up Guice container with configuration 
> companyStreamingAppWrapperConfiguration{company.app.name=AlertsOrganizerInstant,
>  company.appgroup=aws, company.env=prod, 
> company.guice.module=com.company.notifications.Alerts.organizer..AlertsOrganizerModule}
> 2017-07-25 17:11:30.118 [main] com.company.config.guice.configModule [INFO] 
> configModule loaded requested override file 
> '/storage/data/secure/config/AnalyticsServiceClient.cfg'
> 2017-07-25 17:11:30.480 [main] 
> com.company.samza.dataService.SamzaSessionFactoriesModule [INFO] Loading prod 
> dbConfig from /data/config/prod.database.properties
> // Hibernate stuff (i.e. our code is hit)
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to