Re: KafkaInputDStream mapping of partitions to tasks
Hi, Has anyone work on this? Best Aries 在 2014年3月30日,3:22,Nicolas Bär 写道: > Hi > > Is there any workaround to this problem? > > I'm trying to implement a KafkaReceiver using the SimpleConsumer API [1] of > Kafka and handle the partition assignment manually. The easiest setup in this > case would be to bind the number of parallel jobs to the number of partitions > in Kafka. This is basically what Samza [2] does. I have a few questions > regarding this implementation: > > The getReceiver method looks like a good starting point to implement the > manual partition assignment. Unfortunately it lacks documentation. As far as > I understood from the API, the getReceiver method is called once and passes > the received object (implementing the NetworkReceiver contract) to the worker > nodes. Therefore the logic to assign partitions to each receiver has to be > implemented within the receiver itself. I'm planning to implement the > following and have some questions in this regard: > 1. within getReceiver: setup a zookeeper queue with the assignment of > partitions to parallel jobs and store the number of consumers needed > - Is the number of parallel jobs accessible through ssc? > 2. within onStart: poll the zookeeper queue and receive the partition > number(s) to receive data from > 3. within onStart: start a new thread for keepalive messages to zookeeper. In > case a node goes down and a new receiver is started up again, the new > receiver can lookup zookeper to find the consumer without recent keepalives > and take it's place. Due to SPARK-1340 this might not even be possible yet. > Is there any insight on why the receiver is not started again? > > Do you think the use of zookeeper in this scenario is a good approach or is > there an easier way using an integrated spark functionality? Also, by using > the SimpleConsumer API one has to manage the offset per consumer. The high > level consumer API solves this problem with zookeeper. I'd take the same > approach, if there's no easier way to handle this in spark. > > > Best > Nicolas > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > [2] > http://samza.incubator.apache.org/learn/documentation/0.7.0/container/task-runner.html > > > On Fri, Mar 28, 2014 at 2:36 PM, Evgeniy Shishkin > wrote: > One more question, > > we are using memory_and_disk_ser_2 > and i worried when those rdds on disk will be removed > http://i.imgur.com/dbq5T6i.png > > unpersist is set to true, and rdds get purged from memory, but disk space > just keep growing. > > On 28 Mar 2014, at 01:32, Tathagata Das wrote: > > > Yes, no one has reported this issue before. I just opened a JIRA on what I > > think is the main problem here > > https://spark-project.atlassian.net/browse/SPARK-1340 > > Some of the receivers dont get restarted. > > I have a bunch refactoring in the NetworkReceiver ready to be posted as a > > PR that should fix this. > > > > Regarding the second problem, I have been thinking of adding flow control > > (i.e. limiting the rate of receiving) for a while, just havent gotten > > around to it. > > I added another JIRA for that for tracking this issue. > > https://spark-project.atlassian.net/browse/SPARK-1341 > > > > > > TD > > > > > > On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin > > wrote: > > > > On 28 Mar 2014, at 01:11, Scott Clasen wrote: > > > > > Evgeniy Shishkin wrote > > >> So, at the bottom — kafka input stream just does not work. > > > > > > > > > That was the conclusion I was coming to as well. Are there open tickets > > > around fixing this up? > > > > > > > I am not aware of such. Actually nobody complained on spark+kafka before. > > So i thought it just works, and then we tried to build something on it and > > almost failed. > > > > I think that it is possible to steal/replicate how twitter storm works with > > kafka. > > They do manual partition assignment, at least this would help to balance > > load. > > > > There is another issue. > > ssc batch creates new rdds every batch duration, always, even it previous > > computation did not finish. > > > > But with kafka, we can consume more rdds later, after we finish previous > > rdds. > > That way it would be much much simpler to not get OOM’ed when starting from > > beginning, > > because we can consume many data from kafka during batch duration and then > > get oom. > > > > But we just can not start slow, can not limit how many to consume during > > batch. > > > > > > > > > > -- > > > View this message in context: > > > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html > > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > > >
Re: KafkaInputDStream mapping of partitions to tasks
Hi Is there any workaround to this problem? I'm trying to implement a KafkaReceiver using the SimpleConsumer API [1] of Kafka and handle the partition assignment manually. The easiest setup in this case would be to bind the number of parallel jobs to the number of partitions in Kafka. This is basically what Samza [2] does. I have a few questions regarding this implementation: The getReceiver method looks like a good starting point to implement the manual partition assignment. Unfortunately it lacks documentation. As far as I understood from the API, the getReceiver method is called once and passes the received object (implementing the NetworkReceiver contract) to the worker nodes. Therefore the logic to assign partitions to each receiver has to be implemented within the receiver itself. I'm planning to implement the following and have some questions in this regard: 1. within getReceiver: setup a zookeeper queue with the assignment of partitions to parallel jobs and store the number of consumers needed - Is the number of parallel jobs accessible through ssc? 2. within onStart: poll the zookeeper queue and receive the partition number(s) to receive data from 3. within onStart: start a new thread for keepalive messages to zookeeper. In case a node goes down and a new receiver is started up again, the new receiver can lookup zookeper to find the consumer without recent keepalives and take it's place. Due to SPARK-1340 this might not even be possible yet. Is there any insight on why the receiver is not started again? Do you think the use of zookeeper in this scenario is a good approach or is there an easier way using an integrated spark functionality? Also, by using the SimpleConsumer API one has to manage the offset per consumer. The high level consumer API solves this problem with zookeeper. I'd take the same approach, if there's no easier way to handle this in spark. Best Nicolas [1] https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example [2] http://samza.incubator.apache.org/learn/documentation/0.7.0/container/task-runner.html On Fri, Mar 28, 2014 at 2:36 PM, Evgeniy Shishkin wrote: > One more question, > > we are using memory_and_disk_ser_2 > and i worried when those rdds on disk will be removed > http://i.imgur.com/dbq5T6i.png > > unpersist is set to true, and rdds get purged from memory, but disk space > just keep growing. > > On 28 Mar 2014, at 01:32, Tathagata Das > wrote: > > > Yes, no one has reported this issue before. I just opened a JIRA on what > I think is the main problem here > > https://spark-project.atlassian.net/browse/SPARK-1340 > > Some of the receivers dont get restarted. > > I have a bunch refactoring in the NetworkReceiver ready to be posted as > a PR that should fix this. > > > > Regarding the second problem, I have been thinking of adding flow > control (i.e. limiting the rate of receiving) for a while, just havent > gotten around to it. > > I added another JIRA for that for tracking this issue. > > https://spark-project.atlassian.net/browse/SPARK-1341 > > > > > > TD > > > > > > On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin > wrote: > > > > On 28 Mar 2014, at 01:11, Scott Clasen wrote: > > > > > Evgeniy Shishkin wrote > > >> So, at the bottom -- kafka input stream just does not work. > > > > > > > > > That was the conclusion I was coming to as well. Are there open > tickets > > > around fixing this up? > > > > > > > I am not aware of such. Actually nobody complained on spark+kafka before. > > So i thought it just works, and then we tried to build something on it > and almost failed. > > > > I think that it is possible to steal/replicate how twitter storm works > with kafka. > > They do manual partition assignment, at least this would help to balance > load. > > > > There is another issue. > > ssc batch creates new rdds every batch duration, always, even it > previous computation did not finish. > > > > But with kafka, we can consume more rdds later, after we finish previous > rdds. > > That way it would be much much simpler to not get OOM'ed when starting > from beginning, > > because we can consume many data from kafka during batch duration and > then get oom. > > > > But we just can not start slow, can not limit how many to consume during > batch. > > > > > > > > > > -- > > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html > > > Sent from the Apache Spark User List mailing list archive at > Nabble.com. > > > > > >
Re: KafkaInputDStream mapping of partitions to tasks
One more question, we are using memory_and_disk_ser_2 and i worried when those rdds on disk will be removed http://i.imgur.com/dbq5T6i.png unpersist is set to true, and rdds get purged from memory, but disk space just keep growing. On 28 Mar 2014, at 01:32, Tathagata Das wrote: > Yes, no one has reported this issue before. I just opened a JIRA on what I > think is the main problem here > https://spark-project.atlassian.net/browse/SPARK-1340 > Some of the receivers dont get restarted. > I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR > that should fix this. > > Regarding the second problem, I have been thinking of adding flow control > (i.e. limiting the rate of receiving) for a while, just havent gotten around > to it. > I added another JIRA for that for tracking this issue. > https://spark-project.atlassian.net/browse/SPARK-1341 > > > TD > > > On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin wrote: > > On 28 Mar 2014, at 01:11, Scott Clasen wrote: > > > Evgeniy Shishkin wrote > >> So, at the bottom — kafka input stream just does not work. > > > > > > That was the conclusion I was coming to as well. Are there open tickets > > around fixing this up? > > > > I am not aware of such. Actually nobody complained on spark+kafka before. > So i thought it just works, and then we tried to build something on it and > almost failed. > > I think that it is possible to steal/replicate how twitter storm works with > kafka. > They do manual partition assignment, at least this would help to balance load. > > There is another issue. > ssc batch creates new rdds every batch duration, always, even it previous > computation did not finish. > > But with kafka, we can consume more rdds later, after we finish previous rdds. > That way it would be much much simpler to not get OOM’ed when starting from > beginning, > because we can consume many data from kafka during batch duration and then > get oom. > > But we just can not start slow, can not limit how many to consume during > batch. > > > > > > -- > > View this message in context: > > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > >
Re: KafkaInputDStream mapping of partitions to tasks
On 28 Mar 2014, at 02:10, Scott Clasen wrote: > Thanks everyone for the discussion. > > Just to note, I restarted the job yet again, and this time there are indeed > tasks being executed by both worker nodes. So the behavior does seem > inconsistent/broken atm. > > Then I added a third node to the cluster, and a third executor came up, and > everything broke :| > > This is kafka’s high-level consumer. Try to raise rebalance retries. Also, as this consumer is threaded, it have some protection against this failure - first it waits some time, and then rebalances. But for spark cluster i think this time is not enough. If there was a way to wait every spark executor to start, rebalance, and only when start to consume, this issue would be less visible. > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3391.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KafkaInputDStream mapping of partitions to tasks
Thanks everyone for the discussion. Just to note, I restarted the job yet again, and this time there are indeed tasks being executed by both worker nodes. So the behavior does seem inconsistent/broken atm. Then I added a third node to the cluster, and a third executor came up, and everything broke :| -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3391.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KafkaInputDStream mapping of partitions to tasks
On 28 Mar 2014, at 01:38, Evgeny Shishkin wrote: > > On 28 Mar 2014, at 01:32, Tathagata Das wrote: > >> Yes, no one has reported this issue before. I just opened a JIRA on what I >> think is the main problem here >> https://spark-project.atlassian.net/browse/SPARK-1340 >> Some of the receivers dont get restarted. >> I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR >> that should fix this. >> Regarding this Jira by default spark commits offsets to zookeeper every so seconds. Even if you fix reconnect to kafka, we do not know from which offsets it will begin to consume. So it would not recompute rdd as it should. It will receive arbitrary data. From the past, or from the future. With high-level consumer we just do not have control over this. Hith-level consumer should not be used in production with spark. Period. Spark should use low-level consumer, control offsets and partition assignment deterministically. Like storm does. >> Regarding the second problem, I have been thinking of adding flow control >> (i.e. limiting the rate of receiving) for a while, just havent gotten around >> to it. >> I added another JIRA for that for tracking this issue. >> https://spark-project.atlassian.net/browse/SPARK-1341 >> >> I think if we have fixed kafka input like above. We can control such window automatically — like tcp window, slow start, and such. But it will be great to have some fix available now anyway. > > Thank you, i will participate and can provide testing of new code. > Sorry for capslock, i just debugged this whole day, literally. > > >> TD >> >> >> On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin >> wrote: >> >> On 28 Mar 2014, at 01:11, Scott Clasen wrote: >> >> > Evgeniy Shishkin wrote >> >> So, at the bottom — kafka input stream just does not work. >> > >> > >> > That was the conclusion I was coming to as well. Are there open tickets >> > around fixing this up? >> > >> >> I am not aware of such. Actually nobody complained on spark+kafka before. >> So i thought it just works, and then we tried to build something on it and >> almost failed. >> >> I think that it is possible to steal/replicate how twitter storm works with >> kafka. >> They do manual partition assignment, at least this would help to balance >> load. >> >> There is another issue. >> ssc batch creates new rdds every batch duration, always, even it previous >> computation did not finish. >> >> But with kafka, we can consume more rdds later, after we finish previous >> rdds. >> That way it would be much much simpler to not get OOM’ed when starting from >> beginning, >> because we can consume many data from kafka during batch duration and then >> get oom. >> >> But we just can not start slow, can not limit how many to consume during >> batch. >> >> >> > >> > -- >> > View this message in context: >> > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> >
Re: KafkaInputDStream mapping of partitions to tasks
On 28 Mar 2014, at 01:32, Tathagata Das wrote: > Yes, no one has reported this issue before. I just opened a JIRA on what I > think is the main problem here > https://spark-project.atlassian.net/browse/SPARK-1340 > Some of the receivers dont get restarted. > I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR > that should fix this. > > Regarding the second problem, I have been thinking of adding flow control > (i.e. limiting the rate of receiving) for a while, just havent gotten around > to it. > I added another JIRA for that for tracking this issue. > https://spark-project.atlassian.net/browse/SPARK-1341 > > Thank you, i will participate and can provide testing of new code. Sorry for capslock, i just debugged this whole day, literally. > TD > > > On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin wrote: > > On 28 Mar 2014, at 01:11, Scott Clasen wrote: > > > Evgeniy Shishkin wrote > >> So, at the bottom — kafka input stream just does not work. > > > > > > That was the conclusion I was coming to as well. Are there open tickets > > around fixing this up? > > > > I am not aware of such. Actually nobody complained on spark+kafka before. > So i thought it just works, and then we tried to build something on it and > almost failed. > > I think that it is possible to steal/replicate how twitter storm works with > kafka. > They do manual partition assignment, at least this would help to balance load. > > There is another issue. > ssc batch creates new rdds every batch duration, always, even it previous > computation did not finish. > > But with kafka, we can consume more rdds later, after we finish previous rdds. > That way it would be much much simpler to not get OOM’ed when starting from > beginning, > because we can consume many data from kafka during batch duration and then > get oom. > > But we just can not start slow, can not limit how many to consume during > batch. > > > > > > -- > > View this message in context: > > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > >
Re: KafkaInputDStream mapping of partitions to tasks
Yes, no one has reported this issue before. I just opened a JIRA on what I think is the main problem here https://spark-project.atlassian.net/browse/SPARK-1340 Some of the receivers dont get restarted. I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR that should fix this. Regarding the second problem, I have been thinking of adding flow control (i.e. limiting the rate of receiving) for a while, just havent gotten around to it. I added another JIRA for that for tracking this issue. https://spark-project.atlassian.net/browse/SPARK-1341 TD On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin wrote: > > On 28 Mar 2014, at 01:11, Scott Clasen wrote: > > > Evgeniy Shishkin wrote > >> So, at the bottom -- kafka input stream just does not work. > > > > > > That was the conclusion I was coming to as well. Are there open tickets > > around fixing this up? > > > > I am not aware of such. Actually nobody complained on spark+kafka before. > So i thought it just works, and then we tried to build something on it and > almost failed. > > I think that it is possible to steal/replicate how twitter storm works > with kafka. > They do manual partition assignment, at least this would help to balance > load. > > There is another issue. > ssc batch creates new rdds every batch duration, always, even it previous > computation did not finish. > > But with kafka, we can consume more rdds later, after we finish previous > rdds. > That way it would be much much simpler to not get OOM'ed when starting > from beginning, > because we can consume many data from kafka during batch duration and then > get oom. > > But we just can not start slow, can not limit how many to consume during > batch. > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > >
Re: KafkaInputDStream mapping of partitions to tasks
On 28 Mar 2014, at 01:11, Scott Clasen wrote: > Evgeniy Shishkin wrote >> So, at the bottom — kafka input stream just does not work. > > > That was the conclusion I was coming to as well. Are there open tickets > around fixing this up? > I am not aware of such. Actually nobody complained on spark+kafka before. So i thought it just works, and then we tried to build something on it and almost failed. I think that it is possible to steal/replicate how twitter storm works with kafka. They do manual partition assignment, at least this would help to balance load. There is another issue. ssc batch creates new rdds every batch duration, always, even it previous computation did not finish. But with kafka, we can consume more rdds later, after we finish previous rdds. That way it would be much much simpler to not get OOM’ed when starting from beginning, because we can consume many data from kafka during batch duration and then get oom. But we just can not start slow, can not limit how many to consume during batch. > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KafkaInputDStream mapping of partitions to tasks
Evgeniy Shishkin wrote > So, at the bottom — kafka input stream just does not work. That was the conclusion I was coming to as well. Are there open tickets around fixing this up? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KafkaInputDStream mapping of partitions to tasks
On 28 Mar 2014, at 00:34, Scott Clasen wrote: Actually looking closer it is stranger than I thought, in the spark UI, one executor has executed 4 tasks, and one has executed 1928 Can anyone explain the workings of a KafkaInputStream wrt kafka partitions and mapping to spark executors and tasks? Well, there are some issues with kafkainput now. When you do KafkaUtils.createStream, it creates kafka high level consumer on one node! I don’t really know how many rdd it will generate during batch window. But when this rdd are created, spark schedules consecutive transformations on that one node, because of locality. You can try to repartition() those rdds. Sometime it helps. To try to consume from kafka on multiple machines you can do (1 to N).map(KafkaUtils.createStream) But then arises issue with kafka high-level consumer! Those consumers operate in one consumer group, and they try to decide which consumer consumes which partitions. And it may just fail to do syncpartitionrebalance, and then you have only a few consumers really consuming. To mitigate this problem, you can set rebalance retries very high, and pray it helps. Then arises yet another feature — if your receiver dies (OOM, hardware failer), you just stop receiving from kafka! Brilliant. And another feature — if you ask spark’s kafkainput to begin with auto.offset.reset = smallest, it will reset you offsets every time you ran application! It does not comply with documentation (reset to earliest offsets if it does not find offsets on zookeeper), it just erase your offsets and start again from zero! And remember that you should restart your streaming app when there is any failure on receiver! So, at the bottom — kafka input stream just does not work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3374.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KafkaInputDStream mapping of partitions to tasks
Actually looking closer it is stranger than I thought, in the spark UI, one executor has executed 4 tasks, and one has executed 1928 Can anyone explain the workings of a KafkaInputStream wrt kafka partitions and mapping to spark executors and tasks? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3374.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KafkaInputDStream mapping of partitions to tasks
If you call repartition() on the original stream you can set the level of parallelism after it's ingested from Kafka. I'm not sure how it maps kafka topic partitions to tasks for the ingest thought. On Thu, Mar 27, 2014 at 11:09 AM, Scott Clasen wrote: > I have a simple streaming job that creates a kafka input stream on a topic > with 8 partitions, and does a forEachRDD > > The job and tasks are running on mesos, and there are two tasks running, > but > only 1 task doing anything. > > I also set spark.streaming.concurrentJobs=8 but still there is only 1 task > doing work. I would have expected that each task took a subset of the > partitions. > > Is there a way to make more than one task share the work here? Are my > expectations off here? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >