[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul

2015-06-19 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593171#comment-14593171
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
--

https://kafka.apache.org/08/configuration.html

it says fetch.message.max.bytes 

The number of byes of messages to attempt to fetch for each topic-partition in 
each fetch request. These bytes will be read into memory for each partition, so 
this helps control the memory used by the consumer. The fetch request size must 
be at least as large as the maximum message size the server allows or else it 
is possible for the producer to send messages larger than the consumer can 
fetch.

This is not per messages , but size of message you fetch in every FetchRequest 
using FetchRequestBuilder

 [STREAMING] Kafka DirectStream API stops receiving messages if collective 
 size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
 exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
 -

 Key: SPARK-8474
 URL: https://issues.apache.org/jira/browse/SPARK-8474
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya
Priority: Critical

 The issue is , if in Kafka there are variable size messages ranging from few 
 KB to few hundred KBs , setting the rate limiting by number of messages can 
 leads to potential issue.
 Let say size of messages in Kafka are such that for default 
 fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
 pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
 number as say 2000. Now with this settings when Kafka RDD pulls messages for 
 its offset range , it will only pull 1000 messages (limited by size of the 
 pull in SimpleConsumer API) and can never be able to pull messages till the 
 desired untilOffset and in KafkaRDD it failed in this assert call..
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul

2015-06-19 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593159#comment-14593159
 ] 

Sean Owen commented on SPARK-8474:
--

The max fetch size is per message. It would not affect how many messages you 
can pull. Can you clarify what you mean?

 [STREAMING] Kafka DirectStream API stops receiving messages if collective 
 size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
 exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
 -

 Key: SPARK-8474
 URL: https://issues.apache.org/jira/browse/SPARK-8474
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya
Priority: Critical

 The issue is , if in Kafka there are variable size messages ranging from few 
 KB to few hundred KBs , setting the rate limiting by number of messages can 
 leads to potential issue.
 Let say size of messages in Kafka are such that for default 
 fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
 pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
 number as say 2000. Now with this settings when Kafka RDD pulls messages for 
 its offset range , it will only pull 1000 messages (limited by size of the 
 pull in SimpleConsumer API) and can never be able to pull messages till the 
 desired untilOffset and in KafkaRDD it failed in this assert call..
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul

2015-06-19 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593273#comment-14593273
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
--

I got this problem just once. Not able to reproduce it after that. Here is the 
executor stack trace from that occurrence . Not sure if this problem is related 
to some Kafka issue where Leader Offset comes wrong.


15/06/18 09:01:21 INFO CoarseGrainedExecutorBackend: Registered signal handlers 
for [TERM, HUP, INT]
15/06/18 09:01:21 INFO SecurityManager: Changing view acls to: hadoop
15/06/18 09:01:21 INFO SecurityManager: Changing modify acls to: hadoop
15/06/18 09:01:21 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
15/06/18 09:01:22 INFO Slf4jLogger: Slf4jLogger started
15/06/18 09:01:22 INFO Remoting: Starting remoting
15/06/18 09:01:22 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://driverPropsFetcher@10.252.5.54:45553]
15/06/18 09:01:22 INFO Utils: Successfully started service 'driverPropsFetcher' 
on port 45553.
15/06/18 09:01:23 INFO SecurityManager: Changing view acls to: hadoop
15/06/18 09:01:23 INFO SecurityManager: Changing modify acls to: hadoop
15/06/18 09:01:23 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
15/06/18 09:01:23 INFO Slf4jLogger: Slf4jLogger started
15/06/18 09:01:23 INFO Remoting: Starting remoting
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.
15/06/18 09:01:23 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkExecutor@10.252.5.54:56579]
15/06/18 09:01:23 INFO Utils: Successfully started service 'sparkExecutor' on 
port 56579.
15/06/18 09:01:23 INFO DiskBlockManager: Created local directory at 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/blockmgr-2badd56a-7877-44d7-bb67-c309935ce1ba
15/06/18 09:01:23 INFO MemoryStore: MemoryStore started with capacity 883.8 MB
15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
akka.tcp://sparkDriver@10.252.5.113:52972/user/CoarseGrainedScheduler
15/06/18 09:01:23 INFO WorkerWatcher: Connecting to worker 
akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker
15/06/18 09:01:23 INFO WorkerWatcher: Successfully connected to 
akka.tcp://sparkWorker@10.252.5.54:49197/user/Worker
15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Successfully registered 
with driver
15/06/18 09:01:23 INFO Executor: Starting executor ID 1 on host 10.252.5.54
15/06/18 09:01:23 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 34554.
15/06/18 09:01:23 INFO NettyBlockTransferService: Server created on 34554
15/06/18 09:01:23 INFO BlockManagerMaster: Trying to register BlockManager
15/06/18 09:01:24 INFO BlockManagerMaster: Registered BlockManager
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 0
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 1
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 2
15/06/18 09:01:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/06/18 09:01:24 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/06/18 09:01:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/06/18 09:01:24 INFO Executor: Fetching 
http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar with 
timestamp 1434618080212
15/06/18 09:01:24 INFO Utils: Fetching 
http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar to 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/fetchFileTemp4240791741464959275.tmp
15/06/18 09:01:25 INFO Utils: Copying 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/19875585461434618080212_cache
 to 
/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar
15/06/18 09:01:25 INFO Executor: Adding 
file:/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar
 to class loader
15/06/18 09:01:25 INFO TorrentBroadcast: Started reading broadcast variable 0
15/06/18 09:01:25 INFO MemoryStore: ensureFreeSpace(1399) called with curMem=0, 
maxMem=926731468
15/06/18 09:01:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 1399.0 B, free 883.8 MB)
15/06/18 09:01:25 INFO TorrentBroadcast: 

[jira] [Commented] (SPARK-8474) [STREAMING] Kafka DirectStream API stops receiving messages if collective size of the messages specified in spark.streaming.kafka.maxRatePerPartition exceeds the defaul

2015-06-19 Thread Dibyendu Bhattacharya (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14593285#comment-14593285
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
--

Yes , right ..

May be this is a false alarm ...I did not see any issue with the logic. As I 
see KafkaRDD keep pulling messages of chunk size fetch.message.max.bytes (1 MB) 
in every fetchBatch and it will keep doing till it reach the untilOffset...So I 
may be wrong here. . I got the issue once and after that not able to reproduce 
it . Shared the executor trace from that run , and I can see some 
OffsetOutOfRange issue. Not sure how that come as I launch the receiver very 
first time and starting from earliest offset. 

Just to mention , for all successive run , I never see the output like this as 
the shared log..

15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 0 
offsets 0 - 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 1 
offsets 0 - 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 2 
offsets 0 - 2338

There must be some problem happened to get the offset ranges which seems to 
wrong I guess.  This topic is very old topic and offset can not start from Zero 
(0)..

 [STREAMING] Kafka DirectStream API stops receiving messages if collective 
 size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
 exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
 -

 Key: SPARK-8474
 URL: https://issues.apache.org/jira/browse/SPARK-8474
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: Dibyendu Bhattacharya
Priority: Critical

 The issue is , if in Kafka there are variable size messages ranging from few 
 KB to few hundred KBs , setting the rate limiting by number of messages can 
 leads to potential issue.
 Let say size of messages in Kafka are such that for default 
 fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
 pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
 number as say 2000. Now with this settings when Kafka RDD pulls messages for 
 its offset range , it will only pull 1000 messages (limited by size of the 
 pull in SimpleConsumer API) and can never be able to pull messages till the 
 desired untilOffset and in KafkaRDD it failed in this assert call..
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org