[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593171#comment-14593171 ] Dibyendu Bhattacharya commented on SPARK-8474: -- https://kafka.apache.org/08/configuration.html it says fetch.message.max.bytes The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. This is not per messages , but size of message you fetch in every FetchRequest using FetchRequestBuilder [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer - Key: SPARK-8474 URL: https://issues.apache.org/jira/browse/SPARK-8474 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Priority: Critical The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. Let say size of messages in Kafka are such that for default fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition number as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages (limited by size of the pull in SimpleConsumer API) and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593159#comment-14593159 ] Sean Owen commented on SPARK-8474: -- The max fetch size is per message. It would not affect how many messages you can pull. Can you clarify what you mean? [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer - Key: SPARK-8474 URL: https://issues.apache.org/jira/browse/SPARK-8474 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Priority: Critical The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. Let say size of messages in Kafka are such that for default fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition number as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages (limited by size of the pull in SimpleConsumer API) and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593273#comment-14593273 ] Dibyendu Bhattacharya commented on SPARK-8474: -- I got this problem just once. Not able to reproduce it after that. Here is the executor stack trace from that occurrence . Not sure if this problem is related to some Kafka issue where Leader Offset comes wrong. 15/06/18 09:01:21 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/06/18 09:01:21 INFO SecurityManager: Changing view acls to: hadoop 15/06/18 09:01:21 INFO SecurityManager: Changing modify acls to: hadoop 15/06/18 09:01:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/06/18 09:01:22 INFO Slf4jLogger: Slf4jLogger started 15/06/18 09:01:22 INFO Remoting: Starting remoting 15/06/18 09:01:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@10.252.5.54:45553] 15/06/18 09:01:22 INFO Utils: Successfully started service 'driverPropsFetcher' on port 45553. 15/06/18 09:01:23 INFO SecurityManager: Changing view acls to: hadoop 15/06/18 09:01:23 INFO SecurityManager: Changing modify acls to: hadoop 15/06/18 09:01:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/06/18 09:01:23 INFO Slf4jLogger: Slf4jLogger started 15/06/18 09:01:23 INFO Remoting: Starting remoting 15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/06/18 09:01:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@10.252.5.54:56579] 15/06/18 09:01:23 INFO Utils: Successfully started service 'sparkExecutor' on port 56579. 15/06/18 09:01:23 INFO DiskBlockManager: Created local directory at /mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/blockmgr-2badd56a-7877-44d7-bb67-c309935ce1ba 15/06/18 09:01:23 INFO MemoryStore: MemoryStore started with capacity 883.8 MB 15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkDriver@10.252.5.113:52972/user/CoarseGrainedScheduler 15/06/18 09:01:23 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker 15/06/18 09:01:23 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker 15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 15/06/18 09:01:23 INFO Executor: Starting executor ID 1 on host 10.252.5.54 15/06/18 09:01:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34554. 15/06/18 09:01:23 INFO NettyBlockTransferService: Server created on 34554 15/06/18 09:01:23 INFO BlockManagerMaster: Trying to register BlockManager 15/06/18 09:01:24 INFO BlockManagerMaster: Registered BlockManager 15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 0 15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 1 15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 2 15/06/18 09:01:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/06/18 09:01:24 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 15/06/18 09:01:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 15/06/18 09:01:24 INFO Executor: Fetching http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar with timestamp 1434618080212 15/06/18 09:01:24 INFO Utils: Fetching http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar to /mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/fetchFileTemp4240791741464959275.tmp 15/06/18 09:01:25 INFO Utils: Copying /mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/19875585461434618080212_cache to /mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar 15/06/18 09:01:25 INFO Executor: Adding file:/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar to class loader 15/06/18 09:01:25 INFO TorrentBroadcast: Started reading broadcast variable 0 15/06/18 09:01:25 INFO MemoryStore: ensureFreeSpace(1399) called with curMem=0, maxMem=926731468 15/06/18 09:01:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1399.0 B, free 883.8 MB) 15/06/18 09:01:25 INFO TorrentBroadcast:
[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul
[ https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593285#comment-14593285 ] Dibyendu Bhattacharya commented on SPARK-8474: -- Yes , right .. May be this is a false alarm ...I did not see any issue with the logic. As I see KafkaRDD keep pulling messages of chunk size fetch.message.max.bytes (1 MB) in every fetchBatch and it will keep doing till it reach the untilOffset...So I may be wrong here. . I got the issue once and after that not able to reproduce it . Shared the executor trace from that run , and I can see some OffsetOutOfRange issue. Not sure how that come as I launch the receiver very first time and starting from earliest offset. Just to mention , for all successive run , I never see the output like this as the shared log.. 15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 0 offsets 0 - 2500 15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 1 offsets 0 - 2500 15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 2 offsets 0 - 2338 There must be some problem happened to get the offset ranges which seems to wrong I guess. This topic is very old topic and offset can not start from Zero (0).. [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer - Key: SPARK-8474 URL: https://issues.apache.org/jira/browse/SPARK-8474 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.4.0 Reporter: Dibyendu Bhattacharya Priority: Critical The issue is , if in Kafka there are variable size messages ranging from few KB to few hundred KBs , setting the rate limiting by number of messages can leads to potential issue. Let say size of messages in Kafka are such that for default fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition number as say 2000. Now with this settings when Kafka RDD pulls messages for its offset range , it will only pull 1000 messages (limited by size of the pull in SimpleConsumer API) and can never be able to pull messages till the desired untilOffset and in KafkaRDD it failed in this assert call.. assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org