[jira] [Commented] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-17 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971893#comment-15971893
 ] 

Stephane Maarek commented on SPARK-20287:
-

[~c...@koeninger.org] It makes sense. I didn't realized in the direct streams, 
that the driver was in charge of assigning metadata to the executors to pull 
data. Therefore yes you're right, it's "incompatible" with the Kafka way of 
being "master-free", where each consumer really doesn't know and shouldn't care 
about how many other consumers there are. I think this ticket can now be closed 
(just re-open it if you don't believe so). Maybe it'll be worth opening a KIP 
on Kafka to have some APIs to allow Spark to be a bit more "optimized", but it 
all seems okay for now. Cheers!

> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-12 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966973#comment-15966973
 ] 

Stephane Maarek commented on SPARK-20287:
-

[~c...@koeninger.org] 
How about using the subscribe pattern?
https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

```
public void subscribe(Collection topics)
Subscribe to the given list of topics to get dynamically assigned partitions. 
Topic subscriptions are not incremental. This list will replace the current 
assignment (if there is one). It is not possible to combine topic subscription 
with group management with manual partition assignment through 
assign(Collection). If the given list of topics is empty, it is treated the 
same as unsubscribe().
```

Then you let Kafka handle the partition assignments? As all the consumers share 
the same group.id, the data will be effectively distributed between every Spark 
instance?

But then I guess you may have already explored that option and it goes against 
the Spark DirectStream API? (not a Spark expert, just trying to understand the 
limitations. I believe you when you say you did it the most straightforward way)

> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966180#comment-15966180
 ] 

Cody Koeninger commented on SPARK-20287:


The issue here is that the underlying new Kafka consumer api doesn't have a way 
for a single consumer to subscribe to multiple partitions, but only read a 
particular range of messages from one of them.

The max capacity is just a simple way of dealing with what is basically a LRU 
cache - if someone creates topics dynamically and then stops sending messages 
to them, you don't want to keep leaking resources.

I'm not claiming there's anything great or elegant about those solutions, but 
they were pretty much the most straightforward way to make the direct stream 
model work with the new kafka consumer api.

> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-11 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963939#comment-15963939
 ] 

Stephane Maarek commented on SPARK-20287:
-

The other issue I can see is the coordinator work that has to re-coordinate XX 
number of Kafka Consumers should one go down. That's more expensive if you have 
100 consumers versus a few. But as you said, it should be performance 
limitation-driven, right now that'd be speculation. 

> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-11 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963938#comment-15963938
 ] 

Stephane Maarek commented on SPARK-20287:
-

[~srowen] those are good points. In the case of 100 separate machines on 100 
tasks, then I agree you have 100 Kafka Consumers no matter what. I guess as you 
said, my optimisation would come when you have tasks on the same machine that 
could share a Kafka Consumer. 
My concern is as you said the number of connections opened to Kafka that might 
be high even if not needed. I understand one Kafka Consumer distributing to 
multiple tasks may bind them together on the receive, and I'm not a Spark 
expert so I can't measure the implications of that on performance. 

My concern then is with the spark.streaming.kafka.consumer.cache.maxCapacity 
parameter. Is that truly needed?
Say one executor consumes from 100 partitions, do we really need to have a 
maxCapacity parameter? The executor should just spin as many consumers as 
needed ?
Same, in a distributed context, can't the individual executors figure out how 
many kafka consumers they need? 

Thanks for the discussion, I appreciate it

> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-11 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963920#comment-15963920
 ] 

Sean Owen commented on SPARK-20287:
---

Spark has a different execution model though where it does want to distribute 
processing of partitions, into logically separate (sometimes physically 
separate) tasks. It makes sense to consume one Kafka partition as one Spark 
partition. If you have 100 workers consuming 100 partitions but on 100 
different machines, there's no way to share those, right?

There might be some scope to use a single consumer to consume n Kafka 
partitions on behalf of n Spark tasks when they happen to be in one executor. 
Does that solve a problem though? you say you think it might be a big overhead, 
but can it be? the overhead sounds like more connections than might be needed 
otherwise. I could see that being a problem at thousands of tasks.

The flip-side is sharing has its own complexity and, I presume, bottlenecks 
that now bind tasks together. This could be problematic, but I haven't thought 
through the details.

I think you'd have to make more of a case it's a problem, and then propose a 
solution?

> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org