python API in Spark-streaming-kafka spark 3.2.1

2022-03-07 Thread Wiśniewski Michał
Hi,
I've read in the documentation, that since spark 3.2.1 python API for 
spark-streaming-kafka is back in the game.
https://spark.apache.org/docs/3.2.1/streaming-programming-guide.html#advanced-sources

But in the Kafka Integration Guide there is no documentation for the python API.
https://spark.apache.org/docs/3.2.1/streaming-kafka-0-10-integration.html

Where can I find basic information how to read from/write to kafka in 
spark-streaming in spark 3.2.1 using python?



---
Regards,
Michał Wiśniewski



Software Developer
tel.: +48 58 52-49-181 , mobile: +48 571322090



[GWP]



Wirtualna Polska Media S.A. | http://onas.wp.pl
02-092 Warszawa, ul. Żwirki i Wigury 16 | NIP 5272645593



Spółki Grupy Wirtualna Polska:

Wirtualna Polska Holding Spółka Akcyjna z siedzibą w Warszawie, ul. Żwirki i 
Wigury 16, 02-092 Warszawa, wpisana do Krajowego Rejestru Sądowego - Rejestru 
Przedsiębiorców prowadzonego przez Sąd Rejonowy dla m.st. Warszawy w Warszawie 
pod nr KRS: 407130, kapitał zakładowy: 1 461 895,65 zł (w całości 
wpłacony), Numer Identyfikacji Podatkowej (NIP): 521-31-11-513

Wirtualna Polska Media Spółka Akcyjna z siedzibą w Warszawie, ul. Żwirki i 
Wigury 16, 02-092 Warszawa, wpisana do Krajowego Rejestru Sądowego - Rejestru 
Przedsiębiorców prowadzonego przez Sąd Rejonowy dla m.st. Warszawy w Warszawie 
pod nr KRS: 580004, kapitał zakładowy: 320 058 550,00 zł (w całości 
wpłacony), Numer Identyfikacji Podatkowej (NIP): 527-26-45-593

Administratorem udostępnionych danych osobowych jest Wirtualna Polska Media 
S.A. z siedzibą w Warszawie (dalej "WPM"). WPM przetwarza Twoje dane osobowe, 
które zostały podane przez Ciebie dobrowolnie w trakcie dotychczasowej 
współpracy, w związku z zawarciem umowy lub zostały zebrane ze źródeł 
powszechnie dostępnych, w szczególności: imię i nazwisko, adres email, numer 
telefonu. Przetwarzamy te dane w celach opisanych w polityce 
prywatności<https://onas.wp.pl/poufnosc.html>, między innymi w celu realizacji 
współpracy, realizacji obowiązków przewidzianych prawem, w celach 
marketingowych WP. Podstawą prawną przetwarzania Twoich danych osobowych w 
celach marketingowych jest prawnie uzasadniony interes jakim jest m.in. 
przesyłanie informacji marketingowych o usługach WP, w tym zaproszeń na 
konferencje branżowe, informacje o publikacjach. Twoje dane możemy przekazywać 
podmiotom przetwarzającym je na nasze zlecenie oraz podmiotom uprawnionym do 
uzyskania danych na podstawie obowiązującego prawa. Masz prawo m.in. do żądania 
dostępu do danych, sprostowania, usunięcia lub ograniczenia ich przetwarzania, 
jak również prawo do zgłoszenia sprzeciwu w przewidzianych w prawie sytuacjach. 
Prawa te oraz sposób ich realizacji opisaliśmy w polityce 
prywatności<https://onas.wp.pl/poufnosc.html>. Tam też znajdziesz informacje 
jak zakomunikować nam Twoją wolę skorzystania z tych praw.


Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-06 Thread Sethupathi T
Gabor,

Thanks for the clarification.

Thanks

On Fri, Sep 6, 2019 at 12:38 AM Gabor Somogyi 
wrote:

> Sethupathi,
>
> Let me extract then the important part what I've shared:
>
> 1. "This ensures that each Kafka source has its own consumer group that
> does not face interference from any other consumer"
> 2. Consumers may eat the data from each other, offset calculation may give
> back wrong result (that's the reason why "extreme caution" is recommended
> in Structured Streaming doc which still applies here)
> 3. yes
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T 
> wrote:
>
>> Gabor,
>>
>> Thanks for the quick response and sharing about spark 3.0,  we need to
>> use spark streaming (KafkaUtils.createDirectStream) than structured
>> streaming by following this document
>> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
>> re-iterating the issue again for better understanding.
>> spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> kafka
>> connector prefix "spark-executor" + group.id for executors, driver uses
>> original group id.
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line # 212,
>>
>>
>> *It would be great if you could provide the explanation to the following
>> questions.*
>>
>> #1 What was the specific reason for prefixing group id in executor ?
>>
>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>>   library by removing the group id prefix? at line # 212 in
>> KafkaUtils.scala
>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
>> advisable to use in production?
>>
>> *Here is the my spark streaming code snippet*
>>
>> val kafkaParams = Map[String, Object](
>>
>>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>>
>>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>>
>>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>>
>>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>>
>>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[StringDeserializer],
>>
>>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[MessageDeserializer],
>>
>>   "security.protocol" -> "SSL",
>>
>>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>>
>>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>>
>>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>>
>>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>>
>>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>>
>> )
>>
>> val stream = KafkaUtils.createDirectStream[String, Message](
>>
>>   ssc,
>>
>>   PreferConsistent,
>>
>>   Subscribe[String, Message](topicsSet, kafkaParams)
>>
>> )
>>
>> ---
>> Thanks in Advance,
>> Sethupathi.T
>>
>>
>> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
>> wrote:
>>
>>> Hi,
>>>
>>> Let me share Spark 3.0 documentation part (Structured Streaming and not
>>> DStreams what you've mentioned but still relevant):
>>>
>>> kafka.group.id string none streaming and batch The Kafka group id to
>>> use in Kafka consumer while reading from Kafka. Use this with caution. By
>>> default, each query generates a unique group id for reading data. This
>>> ensures that each Kafka source has its own consumer group that does not
>>> face interference from any other consumer, and therefore can read all of
>>> the partitions of its subscribed topics. In some scenarios (for example,
>>> Kafka group-based authorization), you may want to use a specific authorized
>>> group id to read data. You can optionally set the group id. However, do
>>> this with extreme caution as it can cause unexpected behavior. Concurrently
>>> running queries (both, batch and streaming) or sources with the same group
>>> id are likely interfere with each other causing each query to read only
>>> part of the 

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-06 Thread Gabor Somogyi
Sethupathi,

Let me extract then the important part what I've shared:

1. "This ensures that each Kafka source has its own consumer group that
does not face interference from any other consumer"
2. Consumers may eat the data from each other, offset calculation may give
back wrong result (that's the reason why "extreme caution" is recommended
in Structured Streaming doc which still applies here)
3. yes

BR,
G


On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T 
wrote:

> Gabor,
>
> Thanks for the quick response and sharing about spark 3.0,  we need to use
> spark streaming (KafkaUtils.createDirectStream) than structured streaming
> by following this document
> https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
> re-iterating the issue again for better understanding.
> spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
> kafka
> connector prefix "spark-executor" + group.id for executors, driver uses
> original group id.
>
> *Here is the code where executor construct executor specific group id *
>
>
> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>  line # 212,
>
>
> *It would be great if you could provide the explanation to the following
> questions.*
>
> #1 What was the specific reason for prefixing group id in executor ?
>
> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>   library by removing the group id prefix? at line # 212 in
> KafkaUtils.scala
> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
> advisable to use in production?
>
> *Here is the my spark streaming code snippet*
>
> val kafkaParams = Map[String, Object](
>
>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>
>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>
>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>
>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>
>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
>
>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
> classOf[MessageDeserializer],
>
>   "security.protocol" -> "SSL",
>
>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>
>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>
>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>
>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>
>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>
> )
>
> val stream = KafkaUtils.createDirectStream[String, Message](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[String, Message](topicsSet, kafkaParams)
>
> )
>
> ---
> Thanks in Advance,
> Sethupathi.T
>
>
> On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
> wrote:
>
>> Hi,
>>
>> Let me share Spark 3.0 documentation part (Structured Streaming and not
>> DStreams what you've mentioned but still relevant):
>>
>> kafka.group.id string none streaming and batch The Kafka group id to use
>> in Kafka consumer while reading from Kafka. Use this with caution. By
>> default, each query generates a unique group id for reading data. This
>> ensures that each Kafka source has its own consumer group that does not
>> face interference from any other consumer, and therefore can read all of
>> the partitions of its subscribed topics. In some scenarios (for example,
>> Kafka group-based authorization), you may want to use a specific authorized
>> group id to read data. You can optionally set the group id. However, do
>> this with extreme caution as it can cause unexpected behavior. Concurrently
>> running queries (both, batch and streaming) or sources with the same group
>> id are likely interfere with each other causing each query to read only
>> part of the data. This may also occur when queries are started/restarted in
>> quick succession. To minimize such issues, set the Kafka consumer session
>> timeout (by setting option "kafka.session.timeout.ms") to be very small.
>> When this is set, option "groupIdPrefix" will be ignored.
>> I think it answers your questions.
>>
>> As a general suggestion maybe it worth to revisit Spark 3.0 because
>> Structured Streaming has another interesting feature:
>> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
&

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 line # 212,


*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
  library by removing the group id prefix? at line # 212 in KafkaUtils.scala
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,

  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,

  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",

  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),

  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],

  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],

  "security.protocol" -> "SSL",

  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,

  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,

  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,

  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,

  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD

)

val stream = KafkaUtils.createDirectStream[String, Message](

  ssc,

  PreferConsistent,

  Subscribe[String, Message](topicsSet, kafkaParams)

)

---
Thanks in Advance,
Sethupathi.T


On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as tr

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
library by removing the group id prefix? at line # 212 in KafkaUtils.scala

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T

On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi 
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
>  wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying to connect t

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Gabor Somogyi
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not
DStreams what you've mentioned but still relevant):

kafka.group.id string none streaming and batch The Kafka group id to use in
Kafka consumer while reading from Kafka. Use this with caution. By default,
each query generates a unique group id for reading data. This ensures that
each Kafka source has its own consumer group that does not face
interference from any other consumer, and therefore can read all of the
partitions of its subscribed topics. In some scenarios (for example, Kafka
group-based authorization), you may want to use a specific authorized group
id to read data. You can optionally set the group id. However, do this with
extreme caution as it can cause unexpected behavior. Concurrently running
queries (both, batch and streaming) or sources with the same group id are
likely interfere with each other causing each query to read only part of
the data. This may also occur when queries are started/restarted in quick
succession. To minimize such issues, set the Kafka consumer session timeout
(by setting option "kafka.session.timeout.ms") to be very small. When this
is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because
Structured Streaming has another interesting feature:
groupIdPrefix string spark-kafka-source streaming and batch Prefix of
consumer group identifiers (`group.id`) that are generated by structured
streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
 wrote:

> Hi Team,
>
> We have secured Kafka cluster (which only allows to consume from the
> pre-configured, authorized consumer group), there is a scenario where we
> want to use spark streaming to consume from secured kafka. so we have
> decided to use spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
> (it
> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
> deploy the application in cluster mode, i realized that the actual group id
> has been prefixed with "spark-executor" in executor configuration (executor
> as trying to connect to kafka with "spark-executor" + actual group id,
> which is not really exists and getting exception).
>
> *Here is the code where executor construct executor specific group id *
>
>
> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>  line
> # 212,
>
> *Here are my Questions*
>
> #1 What was the specific reason for prefixing group id in executor ?
>
> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
> library by removing the group id prefix?
>
> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
> advisable to use in production?
>
> *Here is the my spark streaming code snippet*
>
> val kafkaParams = Map[String, Object](
>
>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
> classOf[MessageDeserializer],
>   "security.protocol" -> "SSL",
>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
> )
>
> val stream = KafkaUtils.createDirectStream[String, Message](
>   ssc,
>   PreferConsistent,
>   Subscribe[String, Message](topicsSet, kafkaParams)
> )
>
> ---
> Thanks in Advance,
> Sethupathi.T
>


[Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

2019-09-05 Thread Sethupathi T
Hi Team,

We have secured Kafka cluster (which only allows to consume from the
pre-configured, authorized consumer group), there is a scenario where we
want to use spark streaming to consume from secured kafka. so we have
decided to use spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
(it
supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
deploy the application in cluster mode, i realized that the actual group id
has been prefixed with "spark-executor" in executor configuration (executor
as trying to connect to kafka with "spark-executor" + actual group id,
which is not really exists and getting exception).

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*Here are my Questions*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T


Spark streaming kafka source delay occasionally

2019-08-15 Thread ans
using kafka consumer, 2 mins batch, tasks process take 2 ~ 5 seconds in
general, but a part of tasks take more than 40 seconds. I guess
*CachedKafkaConsumer#poll* could be problem.

private def poll(timeout: Long): Unit = {
val p = consumer.poll(timeout)
val r = p.records(topicPartition)
logDebug(s"Polled ${p.partitions()}  ${r.size}")
buffer = r.iterator
}

make changes on that method. add time consuming log

private def poll(timeout: Long): Unit = {
val start = System.currentTimeMillis()
val p = consumer.poll(timeout)
val end = System.currentTimeMillis()
val r = p.records(topicPartition)
logInfo(s"Polled ${p.partitions()}  ${r.size} cost ${end-start}" +
  s" ${System.currentTimeMillis() - end} timeout setting ${timeout}")
buffer = r.iterator
}

task log below:
19/08/15 16:46:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 124
19/08/15 16:46:00 INFO executor.Executor: Running task 3.0 in stage 10.0
(TID 124)
19/08/15 16:46:00 INFO broadcast.TorrentBroadcast: Started reading broadcast
variable 10
19/08/15 16:46:00 INFO memory.MemoryStore: Block broadcast_10_piece0 stored
as bytes in memory (estimated size 2.1 KB, free 366.2 MB)
19/08/15 16:46:00 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 10 took 10 ms
19/08/15 16:46:00 INFO memory.MemoryStore: Block broadcast_10 stored as
values in memory (estimated size 3.6 KB, free 366.2 MB)
19/08/15 16:46:00 INFO kafka010.KafkaRDD: Computing topic test_topic,
partition 9 offsets 4382382407 -> 4382457892
*19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
705 cost 40050 0 timeout setting 12
*19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
691 cost 26 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
725 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
714 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
721 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
709 cost 13 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
719 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
734 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
728 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
724 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
719 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
738 cost 12 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
709 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
701 cost 18 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
718 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
734 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
702 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
717 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
711 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
715 cost 33 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
726 cost 11 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
713 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
697 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
719 cost 22 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
750 cost 13 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
707 cost 11 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
743 cost 13 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
746 cost 12 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
737 cost 11 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
715 cost 17 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [t

Why "spark-streaming-kafka-0-10" is still experimental?

2019-04-04 Thread Doaa Medhat
Dears,

I'm working on a project that should integrate spark streaming with kafka
using java.
Currently the official documentation is confusing, it's not clear whether
"spark-streaming-kafka-0-10" is safe to be used in production environment
or not.

According to "Spark Streaming + Kafka Integration Guide"
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
=> "spark-streaming-kafka-0-10" is marked as "Stable" and
"spark-streaming-kafka-0-8" is marked as "Deprecated"

However, in "Spark Streaming + Kafka Integration Guide (Kafka broker
version 0.10.0 or higher)"
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
=> It's mentioned that "This version of the integration is marked as
experimental, so the API is potentially subject to change." and also all
APIs have the annotation @Experimental. Also, it has been initially
released from around 3 years!!

So, is it safe to use "spark-streaming-kafka-0-10" in production? and why
this version is still marked as experimental till now? and when it's
supposed to be marked as a stable version?

Thanks,


Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-30 Thread Cody Koeninger
You're using an older version of spark, with what looks like a
manually included different version of the kafka-clients jar (1.0)
than what that version of the spark connector was written to depend on
(0.10.0.1), so there's no telling what's going on.

On Wed, Aug 29, 2018 at 3:40 PM, Guillermo Ortiz Fernández
 wrote:
> I can't... do you think that it's a possible bug of this version?? from
> Spark or Kafka?
>
> El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
> escribió:
>>
>> Are you able to try a recent version of spark?
>>
>> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>>  wrote:
>> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
>> > exception and Spark dies.
>> >
>> > I couldn't see any error or problem among the machines, anybody has the
>> > reason about this error?
>> >
>> >
>> > java.lang.IllegalStateException: This consumer has already been closed.
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.Option.orElse(Option.scala:289)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.mutabl

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I can't... do you think that it's a possible bug of this version?? from
Spark or Kafka?

El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
escribió:

> Are you able to try a recent version of spark?
>
> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>  wrote:
> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> > exception and Spark dies.
> >
> > I couldn't see any error or problem among the machines, anybody has the
> > reason about this error?
> >
> >
> > java.lang.IllegalStateException: This consumer has already been closed.
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> > ~[kafka-clients-1.0.0.jar:na]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at scala.Option.orElse(Option.scala:289)
> ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > ~[scala-library-2.11.11.jar:na]
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> > ~[scala-library-2.11.11.jar:na]
> > at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> > ~[scala-library-2.11.11.jar:na]
> > at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> > ~[scala-library-2.11.11.jar:na]
> > at
> >
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> > at
> >
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> > ~[spark-streaming_2.

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Cody Koeninger
Are you able to try a recent version of spark?

On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
 wrote:
> I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> exception and Spark dies.
>
> I couldn't see any error or problem among the machines, anybody has the
> reason about this error?
>
>
> java.lang.IllegalStateException: This consumer has already been closed.
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(Job

Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Guillermo Ortiz Fernández
I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
exception and Spark dies.

I couldn't see any error or problem among the machines, anybody has the
reason about this error?


java.lang.IllegalStateException: This consumer has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
~[kafka-clients-1.0.0.jar:na]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[scala-library-2.11.11.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
~[scala-library-2.11.11.jar:na]
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
~[scala-library-2.11.11.jar:na]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
~[spark-core_2.11-2.0.2.15.jar:2.0.2.15]


Re: spark 2.3.1 with kafka spark-streaming-kafka-0-10 (java.lang.AbstractMethodError)

2018-06-28 Thread Peter Liu
Hello there,

I just upgraded to spark 2.3.1 from spark 2.2.1, ran my streaming workload
and got the error (java.lang.AbstractMethodError) never seen before; check
the error stack attached in (a) bellow.

anyone knows if  spark 2.3.1 works well with kafka
spark-streaming-kafka-0-10?

this link spark kafka integration page doesn't say anything about any
limitation:
https://spark.apache.org/docs/2.3.1/streaming-kafka-integration.html

but this discussion seems to say there is indeed an issue when upgrading to
spark 2.3.1:
https://stackoverflow.com/questions/49180931/abstractmethoderror-creating-kafka-stream

i also rebuilt the workload with some spark 2.3.1 jars (see (b) below). it
doesn't seem to help.

Would be great if anyone could kindly share any insights here.

Thanks!

Peter

(a) the exception
Exception in thread "stream execution thread for [id =
5adae836-268a-4ebf-adc4-e3cc9fbe5acf, runId =
70e78d5c-665e-4c6f-a0cc-41a56e488e30]" java.lang.AbstractMethodError
at
org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.initializeLogIfNecessary(KafkaSourceProvider.scala:369)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.log(KafkaSourceProvider.scala:369)
at
org.apache.spark.internal.Logging$class.logDebug(Logging.scala:58)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.logDebug(KafkaSourceProvider.scala:369)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:439)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:394)
at
org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
at
org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
at
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at
scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

(b)* the build script update:*

[pgl@datanode20 SparkStreamingBenchmark-RemoteConsumer-Spk231]$ diff
build.sbt spk211-build.sbt.original
10,11c10,11
< libraryDependencies += "org.apache.spark" % "spark-sql_2.11" %* "2.3.1"*
< libraryDependencies += "org.apache.spark" % "spark-core_2.11" %* "2.3.1"*
---
> libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.1"
> libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.1"
[pgl@datanode20 SparkStreamingBenchmark-RemoteConsumer-Spk231]$


spark streaming kafka not displaying data in local eclipse

2018-01-16 Thread vr spark
Hi,
I have a simple Java program to read data from kafka using spark streaming.

When i run it from eclipse on my mac, it is connecting to the zookeeper,
bootstrap nodes,
But its not displaying any data. it does not give any error.


it just shows

18/01/16 20:49:15 INFO Executor: Finished task 96.0 in stage 0.0 (TID 0).
1412 bytes result sent to driver
18/01/16 20:49:15 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
2, localhost, partition 1, ANY, 5832 bytes)
18/01/16 20:49:15 INFO Executor: Running task 1.0 in stage 0.0 (TID 2)
18/01/16 20:49:15 INFO TaskSetManager: Finished task 96.0 in stage 0.0 (TID
0) in 111 ms on localhost (1/97)
18/01/16 20:49:15 INFO KafkaRDD: Computing topic data_stream, partition 16
offsets 25624028 -> 25624097
18/01/16 20:49:15 INFO VerifiableProperties: Verifying properties
18/01/16 20:49:15 INFO VerifiableProperties: Property auto.offset.reset is
overridden to largest
18/01/16 20:49:15 INFO VerifiableProperties: Property
fetch.message.max.bytes is overridden to 20971520
18/01/16 20:49:15 INFO VerifiableProperties: Property group.id is
overridden to VR-Test-Group
18/01/16 20:49:15 INFO VerifiableProperties: Property zookeeper.connect is
overridden to zk.kafka-cluster...:8091
18/01/16 20:49:25 INFO JobScheduler: Added jobs for time 151616456 ms
18/01/16 20:49:36 INFO JobScheduler: Added jobs for time 151616457 ms
18/01/16 20:49:45 INFO JobScheduler: Added jobs for time 151616458 ms
18/01/16 20:49:55 INFO JobScheduler: Added jobs for time 151616459 ms
18/01/16 20:50:07 INFO JobScheduler: Added jobs for time 151616460 ms
18/01/16 20:50:15 INFO JobScheduler: Added jobs for time 151616461 ms

But when i export it as jar and run it in a remote spark cluster , it does
display the actual data.

Please suggest what could be wrong.

thanks
VR


Spark Streaming Kafka

2017-11-10 Thread Frank Staszak
Hi All, I’m new to streaming avro records and am parsing Avro from a Kafka 
direct stream with spark streaming 2.1.1, I was wondering if anyone could 
please suggest an API for decoding Avro records with Scala? I’ve found 
KafkaAvroDecoder, twitter/bijection and the Avro library, each seem to handle 
decoding, has anyone found benefits in terms of using one over the other (for 
decoding)? It would seem preferable to just retrieve the avro schema from the 
schema registry then translate the avro records to a case class, is this the 
preferred method to decode avro using the KafkaAvroDecoder?

Thank you in advance,
-Frank
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming + Kafka + Hive: delayed

2017-09-20 Thread toletum
Hello.

I have a process (python) that reads a kafka queue, for each record it checks 
in a table.

# Load table in memory
table=sqlContext.sql("select id from table")
table.cache()

kafkaTopic.foreachRDD(processForeach)

def processForeach (time, rdd):
 print(time)
 for k in rdd.collect ():
 if (table.filter("id =' %s'" % k["id"]).count()>0):
 print (k)

The problem is that little by little spark time is lagging behind, I can see it 
in the "print(time)" output. the kafka topic with a maximum of 3 messages per 
second.


Spark Streaming Kafka Job has strange behavior for certain tasks

2017-04-05 Thread Justin Miller
Greetings!

I've been running various spark streaming jobs to persist data from kafka 
topics and one persister in particular seems to have issues. I've verified that 
the number of messages is the same per partition (roughly of course) and the 
volume of data is a fraction of the volume of other persisters that appear to 
be working fine. 

The tasks appear to go fine until approximately 74-80 of the tasks (of 96) in, 
and then the remaining tasks take a while. I'm using EMR/Spark 2.1.0/Kafka 
0.10.0.1/EMRFS (EMR's S3 solution). Any help would be greatly appreciated!

Here's the code I'm using to do the transformation:

val transformedData = transformer(sqlContext.createDataFrame(values, 
converter.schema))

transformedData
  .write
  .mode(Append)
  .partitionBy(persisterConfig.partitioning: _*)
  .format("parquet")
  .save(parquetPath)

Here's the output of the job as it's running (thrift -> parquet/snappy -> s3 is 
the flow), the files are roughly the same size (96 files per 10 minute window):

17/04/05 16:43:43 INFO TaskSetManager: Finished task 72.0 in stage 7.0 (TID 
722) in 10089 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 57) 
(1/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 58.0 in stage 7.0 (TID 
680) in 10099 ms on ip-172-20-218-229.us-west-2.compute.internal (executor 90) 
(2/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 81.0 in stage 7.0 (TID 
687) in 10244 ms on ip-172-20-218-144.us-west-2.compute.internal (executor 8) 
(3/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 23.0 in stage 7.0 (TID 
736) in 10236 ms on ip-172-20-209-248.us-west-2.compute.internal (executor 82) 
(4/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 52.0 in stage 7.0 (TID 
730) in 10275 ms on ip-172-20-218-144.us-west-2.compute.internal (executor 78) 
(5/96)
17/04/05 16:43:43 INFO TaskSetManager: Finished task 45.0 in stage 7.0 (TID 
691) in 10289 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 41) 
(6/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 13.0 in stage 7.0 (TID 
712) in 10532 ms on ip-172-20-223-100.us-west-2.compute.internal (executor 65) 
(7/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 42.0 in stage 7.0 (TID 
694) in 10595 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 18) 
(8/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID 763) 
in 10623 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 74) (9/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 82.0 in stage 7.0 (TID 
727) in 10631 ms on ip-172-20-212-76.us-west-2.compute.internal (executor 72) 
(10/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 69.0 in stage 7.0 (TID 
729) in 10716 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 55) 
(11/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 65.0 in stage 7.0 (TID 
673) in 10733 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 67) 
(12/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 15.0 in stage 7.0 (TID 
684) in 10737 ms on ip-172-20-213-64.us-west-2.compute.internal (executor 85) 
(13/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 27.0 in stage 7.0 (TID 
748) in 10747 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 10) 
(14/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 46.0 in stage 7.0 (TID 
699) in 10834 ms on ip-172-20-218-229.us-west-2.compute.internal (executor 48) 
(15/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 6.0 in stage 7.0 (TID 719) 
in 10838 ms on ip-172-20-211-125.us-west-2.compute.internal (executor 52) 
(16/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 11.0 in stage 7.0 (TID 
739) in 10892 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 83) 
(17/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 88.0 in stage 7.0 (TID 
697) in 10900 ms on ip-172-20-212-43.us-west-2.compute.internal (executor 70) 
(18/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 35.0 in stage 7.0 (TID 
678) in 10909 ms on ip-172-20-212-63.us-west-2.compute.internal (executor 77) 
(19/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 700) 
in 10906 ms on ip-172-20-208-230.us-west-2.compute.internal (executor 46) 
(20/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 36.0 in stage 7.0 (TID 
732) in 10935 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 69) 
(21/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 19.0 in stage 7.0 (TID 
759) in 10948 ms on ip-172-20-223-100.us-west-2.compute.internal (executor 37) 
(22/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 41.0 in stage 7.0 (TID 
703) in 11013 ms on ip-172-20-217-201.us-west-2.compute.internal (executor 81) 
(23/96)
17/04/05 16:43:44 INFO TaskSetManager: Finished task 8.0 in stage 7.0 (TID 745) 
in 11007 ms on ip-172-20-215-172.us-west-2.compute.internal (executor 13) 
(24/96)
17/04/05 16:43:44 INFO TaskSetManager: Finish

Re: Spark streaming + kafka error with json library

2017-03-30 Thread Srikanth
Thanks for the tip. That worked. When would one use the assembly?

On Wed, Mar 29, 2017 at 7:13 PM, Tathagata Das 
wrote:

> Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly)
>
> On Wed, Mar 29, 2017 at 9:59 AM, Srikanth  wrote:
>
>> Hello,
>>
>> I'm trying to use "org.json4s" % "json4s-native" library in a spark
>> streaming + kafka direct app.
>> When I use the latest version of the lib I get an error similar to this
>> <https://github.com/json4s/json4s/issues/316>
>> The work around suggest there is to use version 3.2.10. As spark has a
>> hard dependency on this version.
>>
>> I forced this version in SBT with
>> dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10"
>>
>> But now it seems to have some conflict with spark-streaming-kafka-0-10-ass
>> embly
>>
>> [error] (*:assembly) deduplicate: different file contents found in the
>> following:
>>
>> [error] C:\Users\stati\.ivy2\cache\org.apache.spark\spark-streaming-
>> kafka-0-10-assembly_2.11\jars\spark-streaming-kafka-0-10-
>> assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/Implic
>> itConversions$$anonfun$flatten2$1.class
>>
>> [error] C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-pars
>> er-combinators_2.11\bundles\scala-parser-combinators_2.11-
>> 1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions
>> $$anonfun$flatten2$1.class
>>
>> DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
>> json4s-native.
>> Any idea how to resolve this? I'm using spark version 2.1.0
>>
>> Thanks,
>> Srikanth
>>
>
>


Re: Spark streaming + kafka error with json library

2017-03-29 Thread Tathagata Das
Try depending on "spark-streaming-kafka-0-10_2.11" (not the assembly)

On Wed, Mar 29, 2017 at 9:59 AM, Srikanth  wrote:

> Hello,
>
> I'm trying to use "org.json4s" % "json4s-native" library in a spark
> streaming + kafka direct app.
> When I use the latest version of the lib I get an error similar to this
> <https://github.com/json4s/json4s/issues/316>
> The work around suggest there is to use version 3.2.10. As spark has a
> hard dependency on this version.
>
> I forced this version in SBT with
> dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10"
>
> But now it seems to have some conflict with spark-streaming-kafka-0-10-
> assembly
>
> [error] (*:assembly) deduplicate: different file contents found in the
> following:
>
> [error] C:\Users\stati\.ivy2\cache\org.apache.spark\spark-
> streaming-kafka-0-10-assembly_2.11\jars\spark-streaming-
> kafka-0-10-assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/
> ImplicitConversions$$anonfun$flatten2$1.class
>
> [error] C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-
> parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:
> scala/util/parsing/combinator/ImplicitConversions$$anonfun$
> flatten2$1.class
>
> DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
> json4s-native.
> Any idea how to resolve this? I'm using spark version 2.1.0
>
> Thanks,
> Srikanth
>


Spark streaming + kafka error with json library

2017-03-29 Thread Srikanth
Hello,

I'm trying to use "org.json4s" % "json4s-native" library in a spark
streaming + kafka direct app.
When I use the latest version of the lib I get an error similar to this
<https://github.com/json4s/json4s/issues/316>
The work around suggest there is to use version 3.2.10. As spark has a hard
dependency on this version.

I forced this version in SBT with
dependencyOverrides += "org.json4s" %% "json4s-native" % "3.2.10"

But now it seems to have some conflict with
spark-streaming-kafka-0-10-assembly

[error] (*:assembly) deduplicate: different file contents found in the
following:

[error]
C:\Users\stati\.ivy2\cache\org.apache.spark\spark-streaming-kafka-0-10-assembly_2.11\jars\spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class

[error]
C:\Users\stati\.ivy2\cache\org.scala-lang.modules\scala-parser-combinators_2.11\bundles\scala-parser-combinators_2.11-1.0.4.jar:scala/util/parsing/combinator/ImplicitConversions$$anonfun$flatten2$1.class

DependencyTree didn't show spark-streaming-kafka-0-10-assembly pulling
json4s-native.
Any idea how to resolve this? I'm using spark version 2.1.0

Thanks,
Srikanth


[ Spark Streaming & Kafka 0.10 ] Possible bug

2017-03-22 Thread Afshartous, Nick

Hi,

I think I'm seeing a bug in the context of upgrading to using the Kafka 0.10 
streaming API.  Code fragments follow.
--
   Nick

   JavaInputDStream> rawStream = 
getDirectKafkaStream();

   JavaDStream> messagesTuple = rawStream.map(
new Function, Tuple2>() {
  @Override
  public Tuple2 
call(ConsumerRecord record) {
  final String hyphen = "-";
  final String topicPartition = record.partition() 
+ hyphen + record.offset();

  return new Tuple2<>(topicPartition, 
record.value());
  }
  }
);

messagesTuple.foreachRDD(new VoidFunction>>() {
 @Override
 public void call(JavaRDD> rdd) throws Exception {
 List> list = 
rdd.take(10);

 for (Tuple2 pair : 
list) {
 log.info("messages tuple key: " + 
pair._1() + " : " + pair._2());
 }
 }
 }
);


The above foreachRDD logs output correctly.

17/03/22 15:57:01 INFO StreamingKafkaConsumerDriver: messages tuple key: 
-13-231599504 : �2017-03-22 15:54:05.568628$�g� 
ClientDev_Perf0585965449a1d3524b9e68396X@6eda8a884567b3442be68282b35aeeafMaterialReviewSinglePlayer`?��@�Vwin��@1.0.1703.0Unlabeled
 Stable�8���Not ApplicableNot ApplicableNot 
ApplicabledayMR_Day01Empty�<<>, String, String> {
...

@Override
public Iterator> call(Iterator> messages)
throws Exception {

while (messages.hasNext()) {
Tuple2 record = messages.next();
String topicPartitionOffset = record._1();
byte[] val = record._2();  // Line 113 <<< 
ClassCastException

   ...



Re: [Spark Streaming+Kafka][How-to]

2017-03-22 Thread Cody Koeninger
Glad you got it worked out.  That's cool as long as your use case doesn't
actually require e.g. partition 0 to always be scheduled to the same
executor across different batches.

On Tue, Mar 21, 2017 at 7:35 PM, OUASSAIDI, Sami 
wrote:

> So it worked quite well with a coalesce, I was able to find an solution to
> my problem : Altough not directly handling the executor a good roundaway
> was to assign the desired partition to a specific stream through assign
> strategy and coalesce to a single partition then repeat the same process
> for the remaining topics on different streams and at the end do a an union
> of these streams.
>
> PS : No shuffle was made during the whole thing since the rdd partitions
> were collapsed to a single one
>
> Le 17 mars 2017 8:04 PM, "Michael Armbrust"  a
> écrit :
>
>> Another option that would avoid a shuffle would be to use assign and
>> coalesce, running two separate streams.
>>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("assign", """{t0: {"0": }, t1:{"0": x}}""")
>>   .load()
>>   .coalesce(1)
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("assign", """{t0: {"1": }, t1:{"1": x}}""")
>>   .load()
>>   .coalesce(1)
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>> On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami > > wrote:
>>
>>> @Cody : Duly noted.
>>> @Michael Ambrust : A repartition is out of the question for our project
>>> as it would be a fairly expensive operation. We tried looking into
>>> targeting a specific executor so as to avoid this extra cost and directly
>>> have well partitioned data after consuming the kafka topics. Also we are
>>> using Spark streaming to save to the cassandra DB and try to keep shuffle
>>> operations to a strict minimum (at best none). As of now we are not
>>> entirely pleased with our current performances, that's why I'm doing a
>>> kafka topic sharding POC and getting the executor to handle the specificied
>>> partitions is central.
>>> ᐧ
>>>
>>> 2017-03-17 9:14 GMT+01:00 Michael Armbrust :
>>>
 Sorry, typo.  Should be a repartition not a groupBy.


> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("subscribe", "t0,t1")
>   .load()
>   .repartition($"partition")
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>

>>>
>>>
>>> --
>>> *Mind7 Consulting*
>>>
>>> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
>>> __
>>>
>>> 64 Rue Taitbout, 75009 Paris
>>>
>>
>>


Re: [Spark Streaming+Kafka][How-to]

2017-03-21 Thread OUASSAIDI, Sami
So it worked quite well with a coalesce, I was able to find an solution to
my problem : Altough not directly handling the executor a good roundaway
was to assign the desired partition to a specific stream through assign
strategy and coalesce to a single partition then repeat the same process
for the remaining topics on different streams and at the end do a an union
of these streams.

PS : No shuffle was made during the whole thing since the rdd partitions
were collapsed to a single one

Le 17 mars 2017 8:04 PM, "Michael Armbrust"  a
écrit :

> Another option that would avoid a shuffle would be to use assign and
> coalesce, running two separate streams.
>
> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("assign", """{t0: {"0": }, t1:{"0": x}}""")
>   .load()
>   .coalesce(1)
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>
> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("assign", """{t0: {"1": }, t1:{"1": x}}""")
>   .load()
>   .coalesce(1)
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>
> On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami 
> wrote:
>
>> @Cody : Duly noted.
>> @Michael Ambrust : A repartition is out of the question for our project
>> as it would be a fairly expensive operation. We tried looking into
>> targeting a specific executor so as to avoid this extra cost and directly
>> have well partitioned data after consuming the kafka topics. Also we are
>> using Spark streaming to save to the cassandra DB and try to keep shuffle
>> operations to a strict minimum (at best none). As of now we are not
>> entirely pleased with our current performances, that's why I'm doing a
>> kafka topic sharding POC and getting the executor to handle the specificied
>> partitions is central.
>> ᐧ
>>
>> 2017-03-17 9:14 GMT+01:00 Michael Armbrust :
>>
>>> Sorry, typo.  Should be a repartition not a groupBy.
>>>
>>>
 spark.readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "...")
   .option("subscribe", "t0,t1")
   .load()
   .repartition($"partition")
   .writeStream
   .foreach(... code to write to cassandra ...)

>>>
>>
>>
>> --
>> *Mind7 Consulting*
>>
>> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
>> __
>>
>> 64 Rue Taitbout, 75009 Paris
>>
>
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Another option that would avoid a shuffle would be to use assign and
coalesce, running two separate streams.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"0": }, t1:{"0": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"1": }, t1:{"1": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami 
wrote:

> @Cody : Duly noted.
> @Michael Ambrust : A repartition is out of the question for our project as
> it would be a fairly expensive operation. We tried looking into targeting a
> specific executor so as to avoid this extra cost and directly have well
> partitioned data after consuming the kafka topics. Also we are using Spark
> streaming to save to the cassandra DB and try to keep shuffle operations to
> a strict minimum (at best none). As of now we are not entirely pleased with
> our current performances, that's why I'm doing a kafka topic sharding POC
> and getting the executor to handle the specificied partitions is central.
> ᐧ
>
> 2017-03-17 9:14 GMT+01:00 Michael Armbrust :
>
>> Sorry, typo.  Should be a repartition not a groupBy.
>>
>>
>>> spark.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "...")
>>>   .option("subscribe", "t0,t1")
>>>   .load()
>>>   .repartition($"partition")
>>>   .writeStream
>>>   .foreach(... code to write to cassandra ...)
>>>
>>
>
>
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread OUASSAIDI, Sami
@Cody : Duly noted.
@Michael Ambrust : A repartition is out of the question for our project as
it would be a fairly expensive operation. We tried looking into targeting a
specific executor so as to avoid this extra cost and directly have well
partitioned data after consuming the kafka topics. Also we are using Spark
streaming to save to the cassandra DB and try to keep shuffle operations to
a strict minimum (at best none). As of now we are not entirely pleased with
our current performances, that's why I'm doing a kafka topic sharding POC
and getting the executor to handle the specificied partitions is central.
ᐧ

2017-03-17 9:14 GMT+01:00 Michael Armbrust :

> Sorry, typo.  Should be a repartition not a groupBy.
>
>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("subscribe", "t0,t1")
>>   .load()
>>   .repartition($"partition")
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>


-- 
*Mind7 Consulting*

Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
__

64 Rue Taitbout, 75009 Paris


Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Sorry, typo.  Should be a repartition not a groupBy.


> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("subscribe", "t0,t1")
>   .load()
>   .repartition($"partition")
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-16 Thread Michael Armbrust
I think it should be straightforward to express this using structured
streaming.  You could ensure that data from a given partition ID is
processed serially by performing a group by on the partition column.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "t0,t1")
  .load()
  .groupBy($"partition")
  .writeStream
  .foreach(... code to write to cassandra ...)


On Thu, Mar 16, 2017 at 8:10 AM, Cody Koeninger  wrote:

> Spark just really isn't a good fit for trying to pin particular
> computation to a particular executor, especially if you're relying on that
> for correctness.
>
> On Thu, Mar 16, 2017 at 7:16 AM, OUASSAIDI, Sami 
> wrote:
>
>>
>> Hi all,
>>
>> So I need to specify how an executor should consume data from a kafka
>> topic.
>>
>> Let's say I have 2 topics : t0 and t1 with two partitions each, and two
>> executors e0 and e1 (both can be on the same node so assign strategy does
>> not work since in the case of a multi executor node it works based on round
>> robin scheduling, whatever first available executor consumes the topic
>> partition )
>>
>> What I would like to do is make e0 consume partition 0 from both t0 and
>> t1 while e1 consumes partition 1 from the t0 and t1. Is there no way around
>> it except messing with scheduling ? If so what's the best approach.
>>
>> The reason for doing so is that executors will write to a cassandra
>> database and since we will be in a parallelized context one executor might
>> "collide" with another and therefore data will be lost, by assigning a
>> partition I want to force the executor to process the data sequentially.
>>
>> Thanks
>> Sami
>> --
>> *Mind7 Consulting*
>>
>> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
>> __
>>
>> 64 Rue Taitbout, 75009 Paris
>> ᐧ
>>
>
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-16 Thread Cody Koeninger
Spark just really isn't a good fit for trying to pin particular computation
to a particular executor, especially if you're relying on that for
correctness.

On Thu, Mar 16, 2017 at 7:16 AM, OUASSAIDI, Sami 
wrote:

>
> Hi all,
>
> So I need to specify how an executor should consume data from a kafka
> topic.
>
> Let's say I have 2 topics : t0 and t1 with two partitions each, and two
> executors e0 and e1 (both can be on the same node so assign strategy does
> not work since in the case of a multi executor node it works based on round
> robin scheduling, whatever first available executor consumes the topic
> partition )
>
> What I would like to do is make e0 consume partition 0 from both t0 and t1
> while e1 consumes partition 1 from the t0 and t1. Is there no way around it
> except messing with scheduling ? If so what's the best approach.
>
> The reason for doing so is that executors will write to a cassandra
> database and since we will be in a parallelized context one executor might
> "collide" with another and therefore data will be lost, by assigning a
> partition I want to force the executor to process the data sequentially.
>
> Thanks
> Sami
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
> ᐧ
>


[Spark Streaming+Kafka][How-to]

2017-03-16 Thread OUASSAIDI, Sami
Hi all,

So I need to specify how an executor should consume data from a kafka topic.

Let's say I have 2 topics : t0 and t1 with two partitions each, and two
executors e0 and e1 (both can be on the same node so assign strategy does
not work since in the case of a multi executor node it works based on round
robin scheduling, whatever first available executor consumes the topic
partition )

What I would like to do is make e0 consume partition 0 from both t0 and t1
while e1 consumes partition 1 from the t0 and t1. Is there no way around it
except messing with scheduling ? If so what's the best approach.

The reason for doing so is that executors will write to a cassandra
database and since we will be in a parallelized context one executor might
"collide" with another and therefore data will be lost, by assigning a
partition I want to force the executor to process the data sequentially.

Thanks
Sami
-- 
*Mind7 Consulting*

Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
__

64 Rue Taitbout, 75009 Paris
ᐧ


Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
This is running in YARN cluster mode. It was restarted automatically and
continued fine.
I was trying to see what went wrong. AFAIK there were no task failure.
Nothing in executor logs. The log I gave is in driver.

After some digging, I did see that there was a rebalance in kafka logs
around this time. So will driver fail and exit in such cases?
I've seen drivers exit after a job has hit max retry attempts. This is
different though rt?

Srikanth


On Tue, Feb 7, 2017 at 5:25 PM, Tathagata Das 
wrote:

> Does restarting after a few minutes solves the problem? Could be a
> transient issue that lasts long enough for spark task-level retries to all
> fail.
>
> On Tue, Feb 7, 2017 at 4:34 PM, Srikanth  wrote:
>
>> Hello,
>>
>> I had a spark streaming app that reads from kafka running for a few hours
>> after which it failed with error
>>
>> *17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time 
>> 148649785 ms
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>>  at 
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
>>  at 
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
>>  at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
>>  at 
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>>  at 
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>>  at 
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at 
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*
>>
>> 
>> 
>>
>> 17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception: 
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>> java.lang.IllegalStateException: No current assignment for partition 
>> mt_event-5
>>
>> 
>> 
>>
>> 17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job 
>> generator (timeout = 5)
>>
>>
>> Driver did not recover from this error and failed. The previous batch ran 
>> 5sec back. There are no indications in the logs that some rebalance happened.
>> As per kafka admin, kafka cluster health was good when this happened and no 
>> maintenance was being done.
>>
>> Any idea what could have gone wrong and why this is a fatal error?
>>
>> Regards,
>> Srikanth
>>
>>
>


Re: Exception in spark streaming + kafka direct app

2017-02-07 Thread Tathagata Das
Does restarting after a few minutes solves the problem? Could be a
transient issue that lasts long enough for spark task-level retries to all
fail.

On Tue, Feb 7, 2017 at 4:34 PM, Srikanth  wrote:

> Hello,
>
> I had a spark streaming app that reads from kafka running for a few hours
> after which it failed with error
>
> *17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time 
> 148649785 ms
> java.lang.IllegalStateException: No current assignment for partition 
> mt_event-5
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*
>
> 
> 
>
> 17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception: 
> java.lang.IllegalStateException: No current assignment for partition 
> mt_event-5
> java.lang.IllegalStateException: No current assignment for partition 
> mt_event-5
>
> 
> 
>
> 17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job 
> generator (timeout = 5)
>
>
> Driver did not recover from this error and failed. The previous batch ran 
> 5sec back. There are no indications in the logs that some rebalance happened.
> As per kafka admin, kafka cluster health was good when this happened and no 
> maintenance was being done.
>
> Any idea what could have gone wrong and why this is a fatal error?
>
> Regards,
> Srikanth
>
>


Exception in spark streaming + kafka direct app

2017-02-07 Thread Srikanth
Hello,

I had a spark streaming app that reads from kafka running for a few hours
after which it failed with error

*17/02/07 20:04:10 ERROR JobScheduler: Error generating jobs for time
148649785 ms
java.lang.IllegalStateException: No current assignment for partition mt_event-5
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)*




17/02/07 20:04:10 ERROR ApplicationMaster: User class threw exception:
java.lang.IllegalStateException: No current assignment for partition
mt_event-5
java.lang.IllegalStateException: No current assignment for partition mt_event-5




17/02/07 20:05:00 WARN JobGenerator: Timed out while stopping the job
generator (timeout = 5)


Driver did not recover from this error and failed. The previous batch
ran 5sec back. There are no indications in the logs that some
rebalance happened.
As per kafka admin, kafka cluster health was good when this happened
and no maintenance was being done.

Any idea what could have gone wrong and why this is a fatal error?

Regards,
Srikanth


Re: spark streaming kafka connector questions

2016-09-16 Thread 毅程
Thanks, That is what I am missing. I have added cache before action, and
that 2nd processing is avoided.

2016-09-10 5:10 GMT-07:00 Cody Koeninger :

> Hard to say without seeing the code, but if you do multiple actions on an
> Rdd without caching, the Rdd will be computed multiple times.
>
> On Sep 10, 2016 2:43 AM, "Cheng Yi"  wrote:
>
> After some investigation, the problem i see is liked caused by a filter and
> union of the dstream.
> if i just do kafka-stream -- process -- output operator, then there is no
> problem, one event will be fetched once.
> if i do
> kafka-stream -- process(1) - filter a stream A for later union --|
>|_ filter a stream B  -- process(2)
> -|_ A union B output process (3)
> the event will be fetched 2 times, duplicate message start process at the
> end of process(1), see following traces:
>
> 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*
>
> 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
> 192.168.2.6:9092 (id: 2147483647 rack: null) for group
> spark-executor-testgid.
>
> log of processing (1) for event 1
>
> 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
> 1401 bytes result sent to driver
>
> 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
> 36) in 3494 ms on localhost (3/3)
>
> 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
> have all completed, from pool
>
> 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
> (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s
>
> 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages
>
> 16/09/10 00:11:03 INFO DAGScheduler: running: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
> ResultStage 11)
>
> 16/09/10 00:11:03 INFO DAGScheduler: failed: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
> (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
> has no missing parents
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
> ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)
>
> 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
> partition 2 offsets 1 -> 2
>
> 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
> time)*)
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 147349146 ms.0 from job set of time 147349146 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
> 147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
> s)*
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 1473491465000 ms.0 from job set of time 1473491465000 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
> 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*
>
> and the 2nd time processing of the event finished without really doing the
> work.
>
> Help is hugely appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-kafka-connector-questi
> ons-tp27681p27687.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: spark streaming kafka connector questions

2016-09-10 Thread Cody Koeninger
Hard to say without seeing the code, but if you do multiple actions on an
Rdd without caching, the Rdd will be computed multiple times.

On Sep 10, 2016 2:43 AM, "Cheng Yi"  wrote:

After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do
kafka-stream -- process(1) - filter a stream A for later union --|
   |_ filter a stream B  -- process(2)
-|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
147349146 ms.0 from job set of time 147349146 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
s)*

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the
work.

Help is hugely appreciated.



--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/spark-streaming-kafka-connector-
questions-tp27681p27687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: spark streaming kafka connector questions

2016-09-10 Thread Cheng Yi
After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do 
kafka-stream -- process(1) - filter a stream A for later union --|
   |_ filter a stream B  -- process(2)
-|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool 

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
147349146 ms.0 from job set of time 147349146 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
s)*

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the
work.

Help is hugely appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681p27687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming kafka connector questions

2016-09-10 Thread 毅程
  String.class, String.class,
> StringDecoder.class, StringDecoder.class,
> > kafkaParams, topicsSet);
> >
> > 2. after i got the message from the kafka streaming via consumer, how
> can I
> > commit the message without finish the whole processing (the whole
> processing
> > might take minutes), it looks I can't get the consumer from the
> KafkaUtils
> > to execute the kafka commit API.
> >
> > 3. If I can't do the manual commit, then I need to tell Kafka Consumer to
> > allow longer session or auto commit, for v0.8 or v0.9, I have tried to
> pass
> > following properties to KafkaUtils
> >
> > kafkaParams.put("auto.commit.enable", "true");
> > kafkaParams.put("auto.commit.interval.ms", "1000");
> > kafkaParams.put("zookeeper.session.timeout.ms", "6");
> > kafkaParams.put("zookeeper.connection.timeout.ms", "6");
> >
> > Still not working.
> > Help is greatly appreciated !
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-kafka-connector-
> questions-tp27681.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: spark streaming kafka connector questions

2016-09-08 Thread Cody Koeninger
- If you're seeing repeated attempts to process the same message, you
should be able to look in the UI or logs and see that a task has
failed.  Figure out why that task failed before chasing other things

- You're not using the latest version, the latest version is for spark
2.0.  There are two versions of the connector for spark 2.0, one for
kafka 0.8 or higher, and one for kafka 0.10 or higher

- Committing individual messages to kafka doesn't make any sense,
spark streaming deals with batches.  If you're doing any aggregations
that involve shuffling, there isn't even a guarantee that you'll
process messages in order for a given topicpartition

- Auto commit has no effect for the 0.8 version of createDirectStream.
Turning it on for the 0.10 version of createDirectStream is a really
bad idea, it will give you undefined delivery semantics, because the
commit to Kafka is unrelated to whether the batch processed
successfully

If you're unclear on how the kafka integration works, see

https://github.com/koeninger/kafka-exactly-once

On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi  wrote:
> I am using the lastest streaming kafka connector
> org.apache.spark
> spark-streaming-kafka_2.11
> 1.6.2
>
> I am facing the problem that a message is delivered two times to my
> consumers. these two deliveries are 10+ seconds apart, it looks this is
> caused by my lengthy message processing (took about 60 seconds), then I
> tried to solve this, but I am still stuck.
>
> 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
> but not v.10
>
> JavaPairInputDStream ds = KafkaUtils.createDirectStream(jsc,
> String.class, String.class, 
> StringDecoder.class, StringDecoder.class,
> kafkaParams, topicsSet);
>
> 2. after i got the message from the kafka streaming via consumer, how can I
> commit the message without finish the whole processing (the whole processing
> might take minutes), it looks I can't get the consumer from the KafkaUtils
> to execute the kafka commit API.
>
> 3. If I can't do the manual commit, then I need to tell Kafka Consumer to
> allow longer session or auto commit, for v0.8 or v0.9, I have tried to pass
> following properties to KafkaUtils
>
> kafkaParams.put("auto.commit.enable", "true");
> kafkaParams.put("auto.commit.interval.ms", "1000");
> kafkaParams.put("zookeeper.session.timeout.ms", "6");
> kafkaParams.put("zookeeper.connection.timeout.ms", "6");
>
> Still not working.
> Help is greatly appreciated !
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark streaming kafka connector questions

2016-09-08 Thread Cheng Yi
I am using the lastest streaming kafka connector
org.apache.spark
spark-streaming-kafka_2.11
1.6.2

I am facing the problem that a message is delivered two times to my
consumers. these two deliveries are 10+ seconds apart, it looks this is
caused by my lengthy message processing (took about 60 seconds), then I
tried to solve this, but I am still stuck.

1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
but not v.10

JavaPairInputDStream ds = KafkaUtils.createDirectStream(jsc, 
String.class, String.class, 
StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);

2. after i got the message from the kafka streaming via consumer, how can I
commit the message without finish the whole processing (the whole processing
might take minutes), it looks I can't get the consumer from the KafkaUtils
to execute the kafka commit API.

3. If I can't do the manual commit, then I need to tell Kafka Consumer to
allow longer session or auto commit, for v0.8 or v0.9, I have tried to pass
following properties to KafkaUtils

kafkaParams.put("auto.commit.enable", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("zookeeper.session.timeout.ms", "6");
kafkaParams.put("zookeeper.connection.timeout.ms", "6");

Still not working.
Help is greatly appreciated !




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread kevin
Thank you,I can't find spark-streaming-kafka_2.10 jar for spark2 from maven
center. so I try the version 1.6.2,it not work ,it need class
org.apache.spark.Logging, which can't find in spark2. so I build
spark-streaming-kafka_2.10
jar for spark2 from the source code. it's work now.

2016-07-26 2:12 GMT+08:00 Cody Koeninger :

> For 2.0, the kafka dstream support is in two separate subprojects
> depending on which version of Kafka you are using
>
> spark-streaming-kafka-0-10
> or
> spark-streaming-kafka-0-8
>
> corresponding to brokers that are version 0.10+ or 0.8+
>
> On Mon, Jul 25, 2016 at 12:29 PM, Reynold Xin  wrote:
> > The presentation at Spark Summit SF was probably referring to Structured
> > Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the
> same
> > production stability level as Spark 1.6. There is also Kafka 0.10
> support in
> > dstream.
> >
> > On July 25, 2016 at 10:26:49 AM, Andy Davidson
> > (a...@santacruzintegration.com) wrote:
> >
> > Hi Kevin
> >
> > Just a heads up at the recent spark summit in S.F. There was a
> presentation
> > about streaming in 2.0. They said that streaming was not going to
> production
> > ready in 2.0.
> >
> > I am not sure if the older 1.6.x version will be supported. My project
> will
> > not be able to upgrade with streaming support. We also use kafka
> >
> > Andy
> >
> > From: Marco Mistroni 
> > Date: Monday, July 25, 2016 at 2:33 AM
> > To: kevin 
> > Cc: "user @spark" , "dev.spark"
> > 
> > Subject: Re: where I can find spark-streaming-kafka for spark2.0
> >
> > Hi Kevin
> >   you should not need to rebuild everything.
> > Instead, i believe you should launch spark-submit by specifying the kafka
> > jar file in your --packages... i had to follow same when integrating
> spark
> > streaming with flume
> >
> >   have you checked this link ?
> > https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> >
> >
> > hth
> >
> >
> >
> > On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
> >>
> >> I have compile it from source code
> >>
> >> 2016-07-25 12:05 GMT+08:00 kevin :
> >>>
> >>> hi,all :
> >>> I try to run example
> org.apache.spark.examples.streaming.KafkaWordCount ,
> >>> I got error :
> >>> Exception in thread "main" java.lang.NoClassDefFoundError:
> >>> org/apache/spark/streaming/kafka/KafkaUtils$
> >>> at
> >>>
> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
> >>> at
> >>>
> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>> at
> >>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>> at
> >>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>> at
> >>>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> >>> at
> >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> >>> Caused by: java.lang.ClassNotFoundException:
> >>> org.apache.spark.streaming.kafka.KafkaUtils$
> >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>> ... 11 more
> >>>
> >>> so where I can find spark-streaming-kafka for spark2.0
> >>
> >>
> >
>


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Cody Koeninger
For 2.0, the kafka dstream support is in two separate subprojects
depending on which version of Kafka you are using

spark-streaming-kafka-0-10
or
spark-streaming-kafka-0-8

corresponding to brokers that are version 0.10+ or 0.8+

On Mon, Jul 25, 2016 at 12:29 PM, Reynold Xin  wrote:
> The presentation at Spark Summit SF was probably referring to Structured
> Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the same
> production stability level as Spark 1.6. There is also Kafka 0.10 support in
> dstream.
>
> On July 25, 2016 at 10:26:49 AM, Andy Davidson
> (a...@santacruzintegration.com) wrote:
>
> Hi Kevin
>
> Just a heads up at the recent spark summit in S.F. There was a presentation
> about streaming in 2.0. They said that streaming was not going to production
> ready in 2.0.
>
> I am not sure if the older 1.6.x version will be supported. My project will
> not be able to upgrade with streaming support. We also use kafka
>
> Andy
>
> From: Marco Mistroni 
> Date: Monday, July 25, 2016 at 2:33 AM
> To: kevin 
> Cc: "user @spark" , "dev.spark"
> 
> Subject: Re: where I can find spark-streaming-kafka for spark2.0
>
> Hi Kevin
>   you should not need to rebuild everything.
> Instead, i believe you should launch spark-submit by specifying the kafka
> jar file in your --packages... i had to follow same when integrating spark
> streaming with flume
>
>   have you checked this link ?
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
>
> hth
>
>
>
> On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
>>
>> I have compile it from source code
>>
>> 2016-07-25 12:05 GMT+08:00 kevin :
>>>
>>> hi,all :
>>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
>>> I got error :
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>> at
>>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
>>> at
>>> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka.KafkaUtils$
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> ... 11 more
>>>
>>> so where I can find spark-streaming-kafka for spark2.0
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Andy Davidson
Hi Kevin

Just a heads up at the recent spark summit in S.F. There was a presentation
about streaming in 2.0. They said that streaming was not going to production
ready in 2.0.

I am not sure if the older 1.6.x version will be supported. My project will
not be able to upgrade with streaming support. We also use kafka

Andy

From:  Marco Mistroni 
Date:  Monday, July 25, 2016 at 2:33 AM
To:  kevin 
Cc:  "user @spark" , "dev.spark"

Subject:  Re: where I can find spark-streaming-kafka for spark2.0

> Hi Kevin
>   you should not need to rebuild everything.
> Instead, i believe you should launch spark-submit by specifying the kafka jar
> file in your --packages... i had to follow same when integrating spark
> streaming with flume
> 
>   have you checked this link ?
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> 
> 
> hth
> 
>   
> 
> On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:
>> I have compile it from source code
>> 
>> 2016-07-25 12:05 GMT+08:00 kevin :
>>> hi,all :
>>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount , I
>>> got error :
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/streaming/kafka/KafkaUtils$
>>> at 
>>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scal
>>> a:57)
>>> at 
>>> 
org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala>>>
)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62>>>
)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
>>> .java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at 
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru
>>> nMain(SparkSubmit.scala:724)
>>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka.KafkaUtils$
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> ... 11 more
>>> 
>>> so where I can find spark-streaming-kafka for spark2.0
>> 
> 




Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Reynold Xin
The presentation at Spark Summit SF was probably referring to Structured
Streaming. The existing Spark Streaming (dstream) in Spark 2.0 has the same
production stability level as Spark 1.6. There is also Kafka 0.10 support
in dstream.

On July 25, 2016 at 10:26:49 AM, Andy Davidson (
a...@santacruzintegration.com) wrote:

Hi Kevin

Just a heads up at the recent spark summit in S.F. There was a presentation
about streaming in 2.0. They said that streaming was not going to
production ready in 2.0.

I am not sure if the older 1.6.x version will be supported. My project will
not be able to upgrade with streaming support. We also use kafka

Andy

From: Marco Mistroni 
Date: Monday, July 25, 2016 at 2:33 AM
To: kevin 
Cc: "user @spark" , "dev.spark" 
Subject: Re: where I can find spark-streaming-kafka for spark2.0

Hi Kevin
  you should not need to rebuild everything.
Instead, i believe you should launch spark-submit by specifying the kafka
jar file in your --packages... i had to follow same when integrating spark
streaming with flume

  have you checked this link ?
https://spark.apache.org/docs/latest/streaming-kafka-integration.html


hth



On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:

> I have compile it from source code
>
> 2016-07-25 12:05 GMT+08:00 kevin :
>
>> hi,all :
>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
>> I got error :
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka.KafkaUtils$
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 11 more
>>
>> so where I can find spark-streaming-kafka for spark2.0
>>
>
>


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread Marco Mistroni
Hi Kevin
  you should not need to rebuild everything.
Instead, i believe you should launch spark-submit by specifying the kafka
jar file in your --packages... i had to follow same when integrating spark
streaming with flume

  have you checked this link ?
https://spark.apache.org/docs/latest/streaming-kafka-integration.html


hth



On Mon, Jul 25, 2016 at 10:20 AM, kevin  wrote:

> I have compile it from source code
>
> 2016-07-25 12:05 GMT+08:00 kevin :
>
>> hi,all :
>> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
>> I got error :
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/kafka/KafkaUtils$
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
>> at
>> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka.KafkaUtils$
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> ... 11 more
>>
>> so where I can find spark-streaming-kafka for spark2.0
>>
>
>


Re: where I can find spark-streaming-kafka for spark2.0

2016-07-25 Thread kevin
I have compile it from source code

2016-07-25 12:05 GMT+08:00 kevin :

> hi,all :
> I try to run example org.apache.spark.examples.streaming.KafkaWordCount ,
> I got error :
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/kafka/KafkaUtils$
> at
> org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
> at
> org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaUtils$
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 11 more
>
> so where I can find spark-streaming-kafka for spark2.0
>


where I can find spark-streaming-kafka for spark2.0

2016-07-24 Thread kevin
hi,all :
I try to run example org.apache.spark.examples.streaming.KafkaWordCount , I
got error :
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$
at
org.apache.spark.examples.streaming.KafkaWordCount$.main(KafkaWordCount.scala:57)
at
org.apache.spark.examples.streaming.KafkaWordCount.main(KafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaUtils$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more

so where I can find spark-streaming-kafka for spark2.0


Re: Spark streaming Kafka Direct API + Multiple consumers

2016-07-07 Thread Rabin Banerjee
It's not required ,

*Simplified Parallelism:* No need to create multiple input Kafka streams
and union them. With directStream, Spark Streaming will create as many RDD
partitions as there are Kafka partitions to consume, which will all read
data from Kafka in parallel. So there is a one-to-one mapping between Kafka
and RDD partitions, which is easier to understand and tune.
On Jul 7, 2016 3:04 PM, "SamyaMaiti"  wrote:

> Hi Team,
>
> Is there a way we can consume from Kafka using spark Streaming direct API
> using multiple consumers (belonging to same consumer group)
>
> Regards,
> Sam
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-Kafka-Direct-API-Multiple-consumers-tp27305.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark streaming Kafka Direct API + Multiple consumers

2016-07-07 Thread SamyaMaiti
Hi Team,

Is there a way we can consume from Kafka using spark Streaming direct API
using multiple consumers (belonging to same consumer group)

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-Kafka-Direct-API-Multiple-consumers-tp27305.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread trung kien
Ah right i see.

Thank you very much.
On May 25, 2016 11:11 AM, "Cody Koeninger"  wrote:

> There's an overloaded createDirectStream method that takes a map from
> topicpartition to offset for the starting point of the stream.
>
> On Wed, May 25, 2016 at 9:59 AM, trung kien  wrote:
> > Thank Cody.
> >
> > I can build the mapping from time ->offset. However how can i pass this
> > offset to Spark Streaming job using that offset? ( using Direct Approach)
> >
> > On May 25, 2016 9:42 AM, "Cody Koeninger"  wrote:
> >>
> >> Kafka does not yet have meaningful time indexing, there's a kafka
> >> improvement proposal for it but it has gotten pushed back to at least
> >> 0.10.1
> >>
> >> If you want to do this kind of thing, you will need to maintain your
> >> own index from time to offset.
> >>
> >> On Wed, May 25, 2016 at 8:15 AM, trung kien  wrote:
> >> > Hi all,
> >> >
> >> > Is there any way to re-compute using Spark Streaming - Kafka Direct
> >> > Approach
> >> > from specific time?
> >> >
> >> > In some cases, I want to re-compute again from specific time (e.g
> >> > beginning
> >> > of day)? is that possible?
> >> >
> >> >
> >> >
> >> > --
> >> > Thanks
> >> > Kien
>


Re: Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread Cody Koeninger
There's an overloaded createDirectStream method that takes a map from
topicpartition to offset for the starting point of the stream.

On Wed, May 25, 2016 at 9:59 AM, trung kien  wrote:
> Thank Cody.
>
> I can build the mapping from time ->offset. However how can i pass this
> offset to Spark Streaming job using that offset? ( using Direct Approach)
>
> On May 25, 2016 9:42 AM, "Cody Koeninger"  wrote:
>>
>> Kafka does not yet have meaningful time indexing, there's a kafka
>> improvement proposal for it but it has gotten pushed back to at least
>> 0.10.1
>>
>> If you want to do this kind of thing, you will need to maintain your
>> own index from time to offset.
>>
>> On Wed, May 25, 2016 at 8:15 AM, trung kien  wrote:
>> > Hi all,
>> >
>> > Is there any way to re-compute using Spark Streaming - Kafka Direct
>> > Approach
>> > from specific time?
>> >
>> > In some cases, I want to re-compute again from specific time (e.g
>> > beginning
>> > of day)? is that possible?
>> >
>> >
>> >
>> > --
>> > Thanks
>> > Kien

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread trung kien
Thank Cody.

I can build the mapping from time ->offset. However how can i pass this
offset to Spark Streaming job using that offset? ( using Direct Approach)
On May 25, 2016 9:42 AM, "Cody Koeninger"  wrote:

> Kafka does not yet have meaningful time indexing, there's a kafka
> improvement proposal for it but it has gotten pushed back to at least
> 0.10.1
>
> If you want to do this kind of thing, you will need to maintain your
> own index from time to offset.
>
> On Wed, May 25, 2016 at 8:15 AM, trung kien  wrote:
> > Hi all,
> >
> > Is there any way to re-compute using Spark Streaming - Kafka Direct
> Approach
> > from specific time?
> >
> > In some cases, I want to re-compute again from specific time (e.g
> beginning
> > of day)? is that possible?
> >
> >
> >
> > --
> > Thanks
> > Kien
>


Re: Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread Cody Koeninger
Kafka does not yet have meaningful time indexing, there's a kafka
improvement proposal for it but it has gotten pushed back to at least
0.10.1

If you want to do this kind of thing, you will need to maintain your
own index from time to offset.

On Wed, May 25, 2016 at 8:15 AM, trung kien  wrote:
> Hi all,
>
> Is there any way to re-compute using Spark Streaming - Kafka Direct Approach
> from specific time?
>
> In some cases, I want to re-compute again from specific time (e.g beginning
> of day)? is that possible?
>
>
>
> --
> Thanks
> Kien

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming - Kafka - java.nio.BufferUnderflowException

2016-05-25 Thread Cody Koeninger
I'd fix the kafka version on the executor classpath (should be
0.8.2.1) before trying anything else, even if it may be unrelated to
the actual error.  Definitely don't upgrade your brokers to 0.9

On Wed, May 25, 2016 at 2:30 AM, Scott W  wrote:
> I'm running into below error while trying to consume message from Kafka
> through Spark streaming (Kafka direct API). This used to work OK when using
> Spark standalone cluster manager. We're just switching to using Cloudera 5.7
> using Yarn to manage Spark cluster and started to see the below error.
>
> Few details:
> - Spark 1.6.0
> - Using Kafka direct stream API
> - Kafka broker version (0.8.2.1)
> - Kafka version in the classpath of Yarn executors (0.9)
> - Kafka brokers not managed by Cloudera
>
> The only difference I see between using standalone cluster manager and yarn
> is the Kafka version being used on the consumer end. (0.8.2.1 vs 0.9)
>
> Trying to figure if version mismatch is really an issue ? If indeed the
> case, what would be the fix for this other than upgrading Kafka brokers to
> 0.9 as well. (eventually yes but not for now) OR is there something else I'm
> missing here.
>
> Appreciate the help.
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 200.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 200.0 (TID 203,..): java.nio.BufferUnderflowException
> at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
> at java.nio.ByteBuffer.get(ByteBuffer.java:715)
> at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
> at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
> at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.Range.foreach(Range.scala:141)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
> at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
>

Spark Streaming - Kafka Direct Approach: re-compute from specific time

2016-05-25 Thread trung kien
Hi all,

Is there any way to re-compute using Spark Streaming - Kafka Direct
Approach from specific time?

In some cases, I want to re-compute again from specific time (e.g beginning
of day)? is that possible?



-- 
Thanks
Kien


Spark Streaming - Kafka - java.nio.BufferUnderflowException

2016-05-25 Thread Scott W
I'm running into below error while trying to consume message from Kafka
through Spark streaming (Kafka direct API). This used to work OK when using
Spark standalone cluster manager. We're just switching to using Cloudera
5.7 using Yarn to manage Spark cluster and started to see the below error.

Few details:
- Spark 1.6.0
- Using Kafka direct stream API
- Kafka broker version (0.8.2.1)
- Kafka version in the classpath of Yarn executors (0.9)
- Kafka brokers not managed by Cloudera

The only difference I see between using standalone cluster manager and yarn
is the Kafka version being used on the consumer end. (0.8.2.1 vs 0.9)

*Trying to figure if version mismatch is really an issue ? If indeed the
case, what would be the fix for this other than upgrading Kafka brokers to
0.9 as well. (eventually yes but not for now) OR is there something else
I'm missing here.*

Appreciate the help.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 200.0 failed 4 times, most recent failure: Lost task 0.3 in stage
200.0 (TID 203,..): java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
at java.nio.ByteBuffer.get(ByteBuffer.java:715)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
at
org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
Let me be more detailed in my response:

Kafka works on “at least once” semantics. Therefore, given your assumption that 
Kafka "will be operational", we can assume that at least once semantics will 
hold.

At this point, it comes down to designing for consumer (really Spark Executor) 
resilience.

From a DC standpoint, you can use an in memory data fabric, like is provided by 
InsightEdge, http://insightedge.io/docs/010/index.html 
. In this case, WAN replication out 
to other DCs is available at the Data Grid layer. See here: 
http://www.gigaspaces.com/Data-Replication 
. 

Assuming that the consumers respect at least once semantics (that is: don’t 
attempt to keep track of the offset or any other state), then Spark can 
parallelize execution using Dumb Consumers. The backing data fabric can do what 
it does best, which is conflict resolution in the case that a DC goes down for 
a period of time.

One advantage of this architecture is that it can be used to load balance, 
reducing infrastructure costs.

Of course, the CAP theorem is still in play, so things like intra-DC latencies 
and consistency SLAs need to be considered. But in principle, you can balance 
competing concerns against one another based on business requirements.

HTH

> On Apr 19, 2016, at 10:53 AM, Erwan ALLAIN  wrote:
> 
> Cody, you're right that was an example. Target architecture would be 3 DCs :) 
> Good point on ZK, I'll have to check that.
> 
> About Spark, both instances will run at the same time but on different 
> topics. That would be quite useless to have to 2DCs working on the same set 
> of data.
> I just want, in case of crash, that the healthy spark works on all topics 
> (retrieve dead spark load). 
> 
> Does it seem an awkward design ?
> 
> On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger  > wrote:
> Maybe I'm missing something, but I don't see how you get a quorum in only 2 
> datacenters (without splitbrain problem, etc).  I also don't know how well ZK 
> will work cross-datacenter.
> 
> As far as the spark side of things goes, if it's idempotent, why not just run 
> both instances all the time.
> 
> 
> 
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN  > wrote:
> I'm describing a disaster recovery but it can be used to make one datacenter 
> offline for upgrade for instance.
> 
> From my point of view when DC2 crashes:
> 
> On Kafka side:
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
> 
> => if the number of in-sync replicas are above the minimum threshold, kafka 
> should be operational
> 
> On downstream datastore side (say Cassandra for instance):
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
> 
> => it should be ok (depends on replication factor)
> 
> On Spark:
> - treatment should be idempotent, it will allow us to restart from the last 
> commited offset
> 
> I understand that starting up a post crash job would work.
> 
> Question is: how can we detect when DC2 crashes to start a new job ?
> 
> dynamic topic partition (at each kafkaRDD creation for instance) + topic  
> subscription may be the answer ?
> 
> I appreciate your effort.
> 
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin  > wrote:
> It the main concern uptime or disaster recovery?
> 
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger > > wrote:
>> 
>> I think the bigger question is what happens to Kafka and your downstream 
>> data store when DC2 crashes.
>> 
>> From a Spark point of view, starting up a post-crash job in a new data 
>> center isn't really different from starting up a post-crash job in the 
>> original data center.
>> 
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN > > wrote:
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>> 
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
>> spark cluster distinct. 
>> 
>> Let's say we have the following DCs configuration in a nominal case. 
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1
>> Worker 1.2   my_groupP2
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like, in case of DC crash, a rebalancing of partition on the healthy 
>> DC, something as follow
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1, P3
>> Worker 1.2   my_groupP2, P4
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I woul

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
At that scale, it’s best not to do coordination at the application layer. 

How much of your data is transactional in nature {all, some, none}? By which I 
mean ACID-compliant.

> On Apr 19, 2016, at 10:53 AM, Erwan ALLAIN  wrote:
> 
> Cody, you're right that was an example. Target architecture would be 3 DCs :) 
> Good point on ZK, I'll have to check that.
> 
> About Spark, both instances will run at the same time but on different 
> topics. That would be quite useless to have to 2DCs working on the same set 
> of data.
> I just want, in case of crash, that the healthy spark works on all topics 
> (retrieve dead spark load). 
> 
> Does it seem an awkward design ?
> 
> On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger  > wrote:
> Maybe I'm missing something, but I don't see how you get a quorum in only 2 
> datacenters (without splitbrain problem, etc).  I also don't know how well ZK 
> will work cross-datacenter.
> 
> As far as the spark side of things goes, if it's idempotent, why not just run 
> both instances all the time.
> 
> 
> 
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN  > wrote:
> I'm describing a disaster recovery but it can be used to make one datacenter 
> offline for upgrade for instance.
> 
> From my point of view when DC2 crashes:
> 
> On Kafka side:
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
> 
> => if the number of in-sync replicas are above the minimum threshold, kafka 
> should be operational
> 
> On downstream datastore side (say Cassandra for instance):
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
> 
> => it should be ok (depends on replication factor)
> 
> On Spark:
> - treatment should be idempotent, it will allow us to restart from the last 
> commited offset
> 
> I understand that starting up a post crash job would work.
> 
> Question is: how can we detect when DC2 crashes to start a new job ?
> 
> dynamic topic partition (at each kafkaRDD creation for instance) + topic  
> subscription may be the answer ?
> 
> I appreciate your effort.
> 
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin  > wrote:
> It the main concern uptime or disaster recovery?
> 
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger > > wrote:
>> 
>> I think the bigger question is what happens to Kafka and your downstream 
>> data store when DC2 crashes.
>> 
>> From a Spark point of view, starting up a post-crash job in a new data 
>> center isn't really different from starting up a post-crash job in the 
>> original data center.
>> 
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN > > wrote:
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>> 
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
>> spark cluster distinct. 
>> 
>> Let's say we have the following DCs configuration in a nominal case. 
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1
>> Worker 1.2   my_groupP2
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like, in case of DC crash, a rebalancing of partition on the healthy 
>> DC, something as follow
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1, P3
>> Worker 1.2   my_groupP2, P4
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like to know if it's possible:
>> - using consumer group ?
>> - using direct approach ? I prefer this one as I don't want to activate WAL.
>> 
>> Hope the explanation is better !
>> 
>> 
>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger > > wrote:
>> The current direct stream only handles exactly the partitions
>> specified at startup.  You'd have to restart the job if you changed
>> partitions.
>> 
>> https://issues.apache.org/jira/browse/SPARK-12177 
>>  has the ongoing work
>> towards using the kafka 0.10 consumer, which would allow for dynamic
>> topicparittions
>> 
>> Regarding your multi-DC questions, I'm not really clear on what you're 
>> saying.
>> 
>> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN > > wrote:
>> > Hello,
>> >
>> > I'm currently designing a solution where 2 distinct clusters Spark (2
>> > datacenters) share the same Kafka (Kafka rack aware or manual broker
>> > repartition).
>> > The aims are
>> > - preventing DC crash: using kafka resiliency and consumer group mechanism
>> > (or else ?)
>> > - keeping consistent

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
Cody, you're right that was an example. Target architecture would be 3 DCs
:) Good point on ZK, I'll have to check that.

About Spark, both instances will run at the same time but on different
topics. That would be quite useless to have to 2DCs working on the same set
of data.
I just want, in case of crash, that the healthy spark works on all topics
(retrieve dead spark load).

Does it seem an awkward design ?

On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger  wrote:

> Maybe I'm missing something, but I don't see how you get a quorum in only
> 2 datacenters (without splitbrain problem, etc).  I also don't know how
> well ZK will work cross-datacenter.
>
> As far as the spark side of things goes, if it's idempotent, why not just
> run both instances all the time.
>
>
>
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN 
> wrote:
>
>> I'm describing a disaster recovery but it can be used to make one
>> datacenter offline for upgrade for instance.
>>
>> From my point of view when DC2 crashes:
>>
>> *On Kafka side:*
>> - kafka cluster will lose one or more broker (partition leader and
>> replica)
>> - partition leader lost will be reelected in the remaining healthy DC
>>
>> => if the number of in-sync replicas are above the minimum threshold,
>> kafka should be operational
>>
>> *On downstream datastore side (say Cassandra for instance):*
>> - deploy accross the 2 DCs in (QUORUM / QUORUM)
>> - idempotent write
>>
>> => it should be ok (depends on replication factor)
>>
>> *On Spark*:
>> - treatment should be idempotent, it will allow us to restart from the
>> last commited offset
>>
>> I understand that starting up a post crash job would work.
>>
>> Question is: how can we detect when DC2 crashes to start a new job ?
>>
>> dynamic topic partition (at each kafkaRDD creation for instance) + topic
>> subscription may be the answer ?
>>
>> I appreciate your effort.
>>
>> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin 
>> wrote:
>>
>>> It the main concern uptime or disaster recovery?
>>>
>>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
>>>
>>> I think the bigger question is what happens to Kafka and your downstream
>>> data store when DC2 crashes.
>>>
>>> From a Spark point of view, starting up a post-crash job in a new data
>>> center isn't really different from starting up a post-crash job in the
>>> original data center.
>>>
>>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
>>> wrote:
>>>
 Thanks Jason and Cody. I'll try to explain a bit better the Multi DC
 case.

 As I mentionned before, I'm planning to use one kafka cluster and 2 or
 more spark cluster distinct.

 Let's say we have the following DCs configuration in a nominal case.
 Kafka partitions are consumed uniformly by the 2 datacenters.

 DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
 DC 1 Master 1.1

 Worker 1.1 my_group P1
 Worker 1.2 my_group P2
 DC 2 Master 2.1

 Worker 2.1 my_group P3
 Worker 2.2 my_group P4
 I would like, in case of DC crash, a rebalancing of partition on the
 healthy DC, something as follow

 DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
 DC 1 Master 1.1

 Worker 1.1 my_group P1*, P3*
 Worker 1.2 my_group P2*, P4*
 DC 2 Master 2.1

 Worker 2.1 my_group P3
 Worker 2.2 my_group P4

 I would like to know if it's possible:
 - using consumer group ?
 - using direct approach ? I prefer this one as I don't want to activate
 WAL.

 Hope the explanation is better !


 On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
 wrote:

> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
>
> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
>
> Regarding your multi-DC questions, I'm not really clear on what you're
> saying.
>
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN <
> eallain.po...@gmail.com> wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group
> mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which
> does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one
> spark. Is
> > it possible to handle new or removed partition ?
> > In the compute method, it looks like we are always using the
> currentOffset
> > map to 

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Cody Koeninger
Maybe I'm missing something, but I don't see how you get a quorum in only 2
datacenters (without splitbrain problem, etc).  I also don't know how well
ZK will work cross-datacenter.

As far as the spark side of things goes, if it's idempotent, why not just
run both instances all the time.



On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN 
wrote:

> I'm describing a disaster recovery but it can be used to make one
> datacenter offline for upgrade for instance.
>
> From my point of view when DC2 crashes:
>
> *On Kafka side:*
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
>
> => if the number of in-sync replicas are above the minimum threshold,
> kafka should be operational
>
> *On downstream datastore side (say Cassandra for instance):*
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
>
> => it should be ok (depends on replication factor)
>
> *On Spark*:
> - treatment should be idempotent, it will allow us to restart from the
> last commited offset
>
> I understand that starting up a post crash job would work.
>
> Question is: how can we detect when DC2 crashes to start a new job ?
>
> dynamic topic partition (at each kafkaRDD creation for instance) + topic
> subscription may be the answer ?
>
> I appreciate your effort.
>
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin 
> wrote:
>
>> It the main concern uptime or disaster recovery?
>>
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
>>
>> I think the bigger question is what happens to Kafka and your downstream
>> data store when DC2 crashes.
>>
>> From a Spark point of view, starting up a post-crash job in a new data
>> center isn't really different from starting up a post-crash job in the
>> original data center.
>>
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
>> wrote:
>>
>>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC
>>> case.
>>>
>>> As I mentionned before, I'm planning to use one kafka cluster and 2 or
>>> more spark cluster distinct.
>>>
>>> Let's say we have the following DCs configuration in a nominal case.
>>> Kafka partitions are consumed uniformly by the 2 datacenters.
>>>
>>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>>> DC 1 Master 1.1
>>>
>>> Worker 1.1 my_group P1
>>> Worker 1.2 my_group P2
>>> DC 2 Master 2.1
>>>
>>> Worker 2.1 my_group P3
>>> Worker 2.2 my_group P4
>>> I would like, in case of DC crash, a rebalancing of partition on the
>>> healthy DC, something as follow
>>>
>>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>>> DC 1 Master 1.1
>>>
>>> Worker 1.1 my_group P1*, P3*
>>> Worker 1.2 my_group P2*, P4*
>>> DC 2 Master 2.1
>>>
>>> Worker 2.1 my_group P3
>>> Worker 2.2 my_group P4
>>>
>>> I would like to know if it's possible:
>>> - using consumer group ?
>>> - using direct approach ? I prefer this one as I don't want to activate
>>> WAL.
>>>
>>> Hope the explanation is better !
>>>
>>>
>>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
>>> wrote:
>>>
 The current direct stream only handles exactly the partitions
 specified at startup.  You'd have to restart the job if you changed
 partitions.

 https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
 towards using the kafka 0.10 consumer, which would allow for dynamic
 topicparittions

 Regarding your multi-DC questions, I'm not really clear on what you're
 saying.

 On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
 wrote:
 > Hello,
 >
 > I'm currently designing a solution where 2 distinct clusters Spark (2
 > datacenters) share the same Kafka (Kafka rack aware or manual broker
 > repartition).
 > The aims are
 > - preventing DC crash: using kafka resiliency and consumer group
 mechanism
 > (or else ?)
 > - keeping consistent offset among replica (vs mirror maker,which does
 not
 > keep offset)
 >
 > I have several questions
 >
 > 1) Dynamic repartition (one or 2 DC)
 >
 > I'm using KafkaDirectStream which map one partition kafka with one
 spark. Is
 > it possible to handle new or removed partition ?
 > In the compute method, it looks like we are always using the
 currentOffset
 > map to query the next batch and therefore it's always the same number
 of
 > partition ? Can we request metadata at each batch ?
 >
 > 2) Multi DC Spark
 >
 > Using Direct approach, a way to achieve this would be
 > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
 > - only one is reading the partition (Check every x interval, "lock"
 stored
 > in cassandra for instance)
 >
 > => not sure if it works just an idea
 >
 > Using Consumer Group
 > - CommitOffset manually at the end of the batch
 >
 > => Does spark handle partition rebalancing ?
 >
 > I'd appreciate a

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
I'm describing a disaster recovery but it can be used to make one
datacenter offline for upgrade for instance.

>From my point of view when DC2 crashes:

*On Kafka side:*
- kafka cluster will lose one or more broker (partition leader and replica)
- partition leader lost will be reelected in the remaining healthy DC

=> if the number of in-sync replicas are above the minimum threshold, kafka
should be operational

*On downstream datastore side (say Cassandra for instance):*
- deploy accross the 2 DCs in (QUORUM / QUORUM)
- idempotent write

=> it should be ok (depends on replication factor)

*On Spark*:
- treatment should be idempotent, it will allow us to restart from the last
commited offset

I understand that starting up a post crash job would work.

Question is: how can we detect when DC2 crashes to start a new job ?

dynamic topic partition (at each kafkaRDD creation for instance) + topic
subscription may be the answer ?

I appreciate your effort.

On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin 
wrote:

> It the main concern uptime or disaster recovery?
>
> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
>
> I think the bigger question is what happens to Kafka and your downstream
> data store when DC2 crashes.
>
> From a Spark point of view, starting up a post-crash job in a new data
> center isn't really different from starting up a post-crash job in the
> original data center.
>
> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
> wrote:
>
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>>
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or
>> more spark cluster distinct.
>>
>> Let's say we have the following DCs configuration in a nominal case.
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>>
>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>> DC 1 Master 1.1
>>
>> Worker 1.1 my_group P1
>> Worker 1.2 my_group P2
>> DC 2 Master 2.1
>>
>> Worker 2.1 my_group P3
>> Worker 2.2 my_group P4
>> I would like, in case of DC crash, a rebalancing of partition on the
>> healthy DC, something as follow
>>
>> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
>> DC 1 Master 1.1
>>
>> Worker 1.1 my_group P1*, P3*
>> Worker 1.2 my_group P2*, P4*
>> DC 2 Master 2.1
>>
>> Worker 2.1 my_group P3
>> Worker 2.2 my_group P4
>>
>> I would like to know if it's possible:
>> - using consumer group ?
>> - using direct approach ? I prefer this one as I don't want to activate
>> WAL.
>>
>> Hope the explanation is better !
>>
>>
>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
>> wrote:
>>
>>> The current direct stream only handles exactly the partitions
>>> specified at startup.  You'd have to restart the job if you changed
>>> partitions.
>>>
>>> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
>>> towards using the kafka 0.10 consumer, which would allow for dynamic
>>> topicparittions
>>>
>>> Regarding your multi-DC questions, I'm not really clear on what you're
>>> saying.
>>>
>>> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
>>> wrote:
>>> > Hello,
>>> >
>>> > I'm currently designing a solution where 2 distinct clusters Spark (2
>>> > datacenters) share the same Kafka (Kafka rack aware or manual broker
>>> > repartition).
>>> > The aims are
>>> > - preventing DC crash: using kafka resiliency and consumer group
>>> mechanism
>>> > (or else ?)
>>> > - keeping consistent offset among replica (vs mirror maker,which does
>>> not
>>> > keep offset)
>>> >
>>> > I have several questions
>>> >
>>> > 1) Dynamic repartition (one or 2 DC)
>>> >
>>> > I'm using KafkaDirectStream which map one partition kafka with one
>>> spark. Is
>>> > it possible to handle new or removed partition ?
>>> > In the compute method, it looks like we are always using the
>>> currentOffset
>>> > map to query the next batch and therefore it's always the same number
>>> of
>>> > partition ? Can we request metadata at each batch ?
>>> >
>>> > 2) Multi DC Spark
>>> >
>>> > Using Direct approach, a way to achieve this would be
>>> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
>>> > - only one is reading the partition (Check every x interval, "lock"
>>> stored
>>> > in cassandra for instance)
>>> >
>>> > => not sure if it works just an idea
>>> >
>>> > Using Consumer Group
>>> > - CommitOffset manually at the end of the batch
>>> >
>>> > => Does spark handle partition rebalancing ?
>>> >
>>> > I'd appreciate any ideas ! Let me know if it's not clear.
>>> >
>>> > Erwan
>>> >
>>> >
>>>
>>
>>
>
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
It the main concern uptime or disaster recovery?

> On Apr 19, 2016, at 9:12 AM, Cody Koeninger  wrote:
> 
> I think the bigger question is what happens to Kafka and your downstream data 
> store when DC2 crashes.
> 
> From a Spark point of view, starting up a post-crash job in a new data center 
> isn't really different from starting up a post-crash job in the original data 
> center.
> 
> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN  > wrote:
> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
> 
> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
> spark cluster distinct. 
> 
> Let's say we have the following DCs configuration in a nominal case. 
> Kafka partitions are consumed uniformly by the 2 datacenters.
> 
> DataCenterSpark   Kafka Consumer GroupKafka partition (P1 to P4)
> DC 1  Master 1.1  
> 
> Worker 1.1my_groupP1
> Worker 1.2my_groupP2
> DC 2  Master 2.1  
> 
> Worker 2.1my_groupP3
> Worker 2.2my_groupP4
> 
> I would like, in case of DC crash, a rebalancing of partition on the healthy 
> DC, something as follow
> 
> DataCenterSpark   Kafka Consumer GroupKafka partition (P1 to P4)
> DC 1  Master 1.1  
> 
> Worker 1.1my_groupP1, P3
> Worker 1.2my_groupP2, P4
> DC 2  Master 2.1  
> 
> Worker 2.1my_groupP3
> Worker 2.2my_groupP4
> 
> I would like to know if it's possible:
> - using consumer group ?
> - using direct approach ? I prefer this one as I don't want to activate WAL.
> 
> Hope the explanation is better !
> 
> 
> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger  > wrote:
> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
> 
> https://issues.apache.org/jira/browse/SPARK-12177 
>  has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
> 
> Regarding your multi-DC questions, I'm not really clear on what you're saying.
> 
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN  > wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one spark. Is
> > it possible to handle new or removed partition ?
> > In the compute method, it looks like we are always using the currentOffset
> > map to query the next batch and therefore it's always the same number of
> > partition ? Can we request metadata at each batch ?
> >
> > 2) Multi DC Spark
> >
> > Using Direct approach, a way to achieve this would be
> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> > - only one is reading the partition (Check every x interval, "lock" stored
> > in cassandra for instance)
> >
> > => not sure if it works just an idea
> >
> > Using Consumer Group
> > - CommitOffset manually at the end of the batch
> >
> > => Does spark handle partition rebalancing ?
> >
> > I'd appreciate any ideas ! Let me know if it's not clear.
> >
> > Erwan
> >
> >
> 
> 



Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Cody Koeninger
I think the bigger question is what happens to Kafka and your downstream
data store when DC2 crashes.

>From a Spark point of view, starting up a post-crash job in a new data
center isn't really different from starting up a post-crash job in the
original data center.

On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN 
wrote:

> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>
> As I mentionned before, I'm planning to use one kafka cluster and 2 or
> more spark cluster distinct.
>
> Let's say we have the following DCs configuration in a nominal case.
> Kafka partitions are consumed uniformly by the 2 datacenters.
>
> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
> DC 1 Master 1.1
>
> Worker 1.1 my_group P1
> Worker 1.2 my_group P2
> DC 2 Master 2.1
>
> Worker 2.1 my_group P3
> Worker 2.2 my_group P4
> I would like, in case of DC crash, a rebalancing of partition on the
> healthy DC, something as follow
>
> DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
> DC 1 Master 1.1
>
> Worker 1.1 my_group P1*, P3*
> Worker 1.2 my_group P2*, P4*
> DC 2 Master 2.1
>
> Worker 2.1 my_group P3
> Worker 2.2 my_group P4
>
> I would like to know if it's possible:
> - using consumer group ?
> - using direct approach ? I prefer this one as I don't want to activate
> WAL.
>
> Hope the explanation is better !
>
>
> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger 
> wrote:
>
>> The current direct stream only handles exactly the partitions
>> specified at startup.  You'd have to restart the job if you changed
>> partitions.
>>
>> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
>> towards using the kafka 0.10 consumer, which would allow for dynamic
>> topicparittions
>>
>> Regarding your multi-DC questions, I'm not really clear on what you're
>> saying.
>>
>> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
>> wrote:
>> > Hello,
>> >
>> > I'm currently designing a solution where 2 distinct clusters Spark (2
>> > datacenters) share the same Kafka (Kafka rack aware or manual broker
>> > repartition).
>> > The aims are
>> > - preventing DC crash: using kafka resiliency and consumer group
>> mechanism
>> > (or else ?)
>> > - keeping consistent offset among replica (vs mirror maker,which does
>> not
>> > keep offset)
>> >
>> > I have several questions
>> >
>> > 1) Dynamic repartition (one or 2 DC)
>> >
>> > I'm using KafkaDirectStream which map one partition kafka with one
>> spark. Is
>> > it possible to handle new or removed partition ?
>> > In the compute method, it looks like we are always using the
>> currentOffset
>> > map to query the next batch and therefore it's always the same number of
>> > partition ? Can we request metadata at each batch ?
>> >
>> > 2) Multi DC Spark
>> >
>> > Using Direct approach, a way to achieve this would be
>> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
>> > - only one is reading the partition (Check every x interval, "lock"
>> stored
>> > in cassandra for instance)
>> >
>> > => not sure if it works just an idea
>> >
>> > Using Consumer Group
>> > - CommitOffset manually at the end of the batch
>> >
>> > => Does spark handle partition rebalancing ?
>> >
>> > I'd appreciate any ideas ! Let me know if it's not clear.
>> >
>> > Erwan
>> >
>> >
>>
>
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Erwan ALLAIN
Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.

As I mentionned before, I'm planning to use one kafka cluster and 2 or more
spark cluster distinct.

Let's say we have the following DCs configuration in a nominal case.
Kafka partitions are consumed uniformly by the 2 datacenters.

DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
DC 1 Master 1.1

Worker 1.1 my_group P1
Worker 1.2 my_group P2
DC 2 Master 2.1

Worker 2.1 my_group P3
Worker 2.2 my_group P4
I would like, in case of DC crash, a rebalancing of partition on the healthy
DC, something as follow

DataCenter Spark Kafka Consumer Group Kafka partition (P1 to P4)
DC 1 Master 1.1

Worker 1.1 my_group P1*, P3*
Worker 1.2 my_group P2*, P4*
DC 2 Master 2.1

Worker 2.1 my_group P3
Worker 2.2 my_group P4

I would like to know if it's possible:
- using consumer group ?
- using direct approach ? I prefer this one as I don't want to activate WAL.

Hope the explanation is better !


On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger  wrote:

> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
>
> https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
>
> Regarding your multi-DC questions, I'm not really clear on what you're
> saying.
>
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN 
> wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group
> mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one
> spark. Is
> > it possible to handle new or removed partition ?
> > In the compute method, it looks like we are always using the
> currentOffset
> > map to query the next batch and therefore it's always the same number of
> > partition ? Can we request metadata at each batch ?
> >
> > 2) Multi DC Spark
> >
> > Using Direct approach, a way to achieve this would be
> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> > - only one is reading the partition (Check every x interval, "lock"
> stored
> > in cassandra for instance)
> >
> > => not sure if it works just an idea
> >
> > Using Consumer Group
> > - CommitOffset manually at the end of the batch
> >
> > => Does spark handle partition rebalancing ?
> >
> > I'd appreciate any ideas ! Let me know if it's not clear.
> >
> > Erwan
> >
> >
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-18 Thread Jason Nerothin
Hi Erwan,

You might consider InsightEdge: http://insightedge.io  
. It has the capability of doing WAN between data grids and would save you the 
work of having to re-invent the wheel. Additionally, RDDs can be shared between 
developers in the same DC.

Thanks,
Jason

> On Apr 18, 2016, at 11:18 AM, Erwan ALLAIN  wrote:
> 
> Hello,
> 
> I'm currently designing a solution where 2 distinct clusters Spark (2 
> datacenters) share the same Kafka (Kafka rack aware or manual broker 
> repartition). 
> The aims are
> - preventing DC crash: using kafka resiliency and consumer group mechanism 
> (or else ?)
> - keeping consistent offset among replica (vs mirror maker,which does not 
> keep offset)
> 
> I have several questions 
> 
> 1) Dynamic repartition (one or 2 DC)
> 
> I'm using KafkaDirectStream which map one partition kafka with one spark. Is 
> it possible to handle new or removed partition ? 
> In the compute method, it looks like we are always using the currentOffset 
> map to query the next batch and therefore it's always the same number of 
> partition ? Can we request metadata at each batch ?
> 
> 2) Multi DC Spark
> 
> Using Direct approach, a way to achieve this would be 
> - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> - only one is reading the partition (Check every x interval, "lock" stored in 
> cassandra for instance)
> 
> => not sure if it works just an idea
> 
> Using Consumer Group
> - CommitOffset manually at the end of the batch
> 
> => Does spark handle partition rebalancing ?
> 
> I'd appreciate any ideas ! Let me know if it's not clear.
> 
> Erwan
> 
> 



Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-18 Thread Cody Koeninger
The current direct stream only handles exactly the partitions
specified at startup.  You'd have to restart the job if you changed
partitions.

https://issues.apache.org/jira/browse/SPARK-12177 has the ongoing work
towards using the kafka 0.10 consumer, which would allow for dynamic
topicparittions

Regarding your multi-DC questions, I'm not really clear on what you're saying.

On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN  wrote:
> Hello,
>
> I'm currently designing a solution where 2 distinct clusters Spark (2
> datacenters) share the same Kafka (Kafka rack aware or manual broker
> repartition).
> The aims are
> - preventing DC crash: using kafka resiliency and consumer group mechanism
> (or else ?)
> - keeping consistent offset among replica (vs mirror maker,which does not
> keep offset)
>
> I have several questions
>
> 1) Dynamic repartition (one or 2 DC)
>
> I'm using KafkaDirectStream which map one partition kafka with one spark. Is
> it possible to handle new or removed partition ?
> In the compute method, it looks like we are always using the currentOffset
> map to query the next batch and therefore it's always the same number of
> partition ? Can we request metadata at each batch ?
>
> 2) Multi DC Spark
>
> Using Direct approach, a way to achieve this would be
> - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> - only one is reading the partition (Check every x interval, "lock" stored
> in cassandra for instance)
>
> => not sure if it works just an idea
>
> Using Consumer Group
> - CommitOffset manually at the end of the batch
>
> => Does spark handle partition rebalancing ?
>
> I'd appreciate any ideas ! Let me know if it's not clear.
>
> Erwan
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-18 Thread Erwan ALLAIN
Hello,

I'm currently designing a solution where 2 distinct clusters Spark (2
datacenters) share the same Kafka (Kafka rack aware or manual broker
repartition).
The aims are
- preventing DC crash: using kafka resiliency and consumer group mechanism
(or else ?)
- keeping consistent offset among replica (vs mirror maker,which does not
keep offset)

I have several questions

1) Dynamic repartition (one or 2 DC)

I'm using KafkaDirectStream which map one partition kafka with one spark.
Is it possible to handle new or removed partition ?
In the compute method, it looks like we are always using the currentOffset
map to query the next batch and therefore it's always the same number of
partition ? Can we request metadata at each batch ?

2) Multi DC Spark

*Using Direct approach,* a way to achieve this would be
- to "assign" (kafka 0.9 term) all topics to the 2 sparks
- only one is reading the partition (Check every x interval, "lock" stored
in cassandra for instance)

=> not sure if it works just an idea

*Using Consumer Group*
- CommitOffset manually at the end of the batch

=> Does spark handle partition rebalancing ?

I'd appreciate any ideas ! Let me know if it's not clear.

Erwan


Re: Get Offset when using Spark Streaming + Kafka

2016-03-06 Thread Cody Koeninger
Have you read the materials linked from
https://github.com/koeninger/kafka-exactly-once

On Sun, Mar 6, 2016 at 8:39 AM, Zhun Shen  wrote:
> Hi,
>
> I use KafkaUtils.createDirectStream to consumer data from Kafka, but I found
> that Zookeeper-based Kafka monitoring tools could not show progress of the
> streaming application because createDirectStream save the offset in
> checkpoints(http://spark.apache.org/docs/latest/streaming-kafka-integration.html),
> but I could not understand the code which shows how to update Zookeeper by
> myself.
>
> Can someone please help me to solve it?
>
> Thanks.
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Get Offset when using Spark Streaming + Kafka

2016-03-06 Thread Zhun Shen
Hi,

I use KafkaUtils.createDirectStream to consumer data from Kafka, but I found 
that Zookeeper-based Kafka monitoring tools could not show progress of the 
streaming application because createDirectStream save the offset in 
checkpoints(http://spark.apache.org/docs/latest/streaming-kafka-integration.html
 ), but I 
could not understand the code which shows how to update Zookeeper by myself.

Can someone please help me to solve it? 

Thanks.





Re: java.nio.channels.ClosedChannelException in Spark Streaming KafKa Direct

2016-02-01 Thread Cody Koeninger
That indicates a problem in network communication between the executor and
the kafka broker.  Have you done any network troubleshooting?



On Mon, Feb 1, 2016 at 9:59 AM, SRK  wrote:

> Hi,
>
> I see the following error in Spark Streaming with Kafka Direct. I think
> that
> this error is related to Kafka topic. Any suggestions on how to avoid this
> error would be of great help.
>
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
> at
>
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-nio-channels-ClosedChannelException-in-Spark-Streaming-KafKa-Direct-tp26124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


java.nio.channels.ClosedChannelException in Spark Streaming KafKa Direct

2016-02-01 Thread SRK
Hi,

I see the following error in Spark Streaming with Kafka Direct. I think that
this error is related to Kafka topic. Any suggestions on how to avoid this
error would be of great help.

java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-nio-channels-ClosedChannelException-in-Spark-Streaming-KafKa-Direct-tp26124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Recovery for Spark Streaming Kafka Direct with OffsetOutOfRangeException

2016-01-21 Thread Cody Koeninger
Looks like this response did go to the list.

As far as OffsetOutOfRange goes, right now that's an unrecoverable error,
because it breaks the underlying invariants (e.g. that the number of
messages in a partition is deterministic once the RDD is defined)

If you want to do some hacking for your own purposes, the place to start
looking would be in KafkaRDD.scala, in fetchBatch.  Just be aware that's a
situation where data has been lost, so you can't get the "right" answer,
you just have to decide what variety of wrong answer you want to get :)


On Thu, Jan 21, 2016 at 11:11 AM, Dan Dutrow  wrote:

> Hey Cody, I would have responded to the mailing list but it looks like
> this thread got aged off. I have the problem where one of my topics dumps
> more data than my spark job can keep up with. We limit the input rate with
> maxRatePerPartition Eventually, when the data is aged off, I get the
> OffsetOutOfRangeException from Kafka, as we would expect. As we work
> towards more efficient processing of that topic, or get more resources, I'd
> like to be able to log the error and continue the application without
> failing. Is there a place where I can catch that error before it gets to
> org.apache.spark.util.Utils$.logUncaughtExceptions ? Maybe somewhere in
> DirectKafkaInputDStream::compute?
>
>
> https://mail-archives.apache.org/mod_mbox/spark-user/201512.mbox/%3CCAKWX9VUoNd4ATGF+0TkNJ+9=b8r2nr9pt7sbgr-bv4nnttk...@mail.gmail.com%3E
> --
> Dan ✆
>


Recovery for Spark Streaming Kafka Direct with OffsetOutOfRangeException

2016-01-21 Thread Dan Dutrow
Hey Cody, I would have responded to the mailing list but it looks like this
thread got aged off. I have the problem where one of my topics dumps more
data than my spark job can keep up with. We limit the input rate with
maxRatePerPartition Eventually, when the data is aged off, I get the
OffsetOutOfRangeException from Kafka, as we would expect. As we work
towards more efficient processing of that topic, or get more resources, I'd
like to be able to log the error and continue the application without
failing. Is there a place where I can catch that error before it gets to
org.apache.spark.util.Utils$.logUncaughtExceptions ? Maybe somewhere in
DirectKafkaInputDStream::compute?

https://mail-archives.apache.org/mod_mbox/spark-user/201512.mbox/%3CCAKWX9VUoNd4ATGF+0TkNJ+9=b8r2nr9pt7sbgr-bv4nnttk...@mail.gmail.com%3E
-- 
Dan ✆


Re: Spark Streaming + Kafka + scala job message read issue

2016-01-15 Thread vivek.meghanathan
All,
The issue was related to apache Cassandra. I have changed the Cassandra to 
datastax Cassandra and the issue is resolved. Also I have changed the spark 
version to 1.3.

There is some serious issue is there between spark Cassandra connector and 
apache Cassandra 2.1+ while using in spark streaming jobs.

Regards
Vivek

On Tue, Jan 05, 2016 at 4:38 pm, Vivek Meghanathan (WT01 - NEP) 
mailto:vivek.meghanat...@wipro.com>> wrote:

Hello All,

After investigating further using a test program, we were able to read the 
kafka input messages using spark streaming.

Once we add a particular line which performs map and reduce – and groupByKey 
(all written in single line), we are not seeing the input message details in 
the logs. We have increased the batch interval to 5 seconds and removed the 
numtasks (it was defined as 10) . Once we made this change the kafka messages 
started to get processed . But it takes long time to process.

This works fine in our local lab server but the problem in the google compute 
engine server. The local lab server is low in spec 8 cpu with 8GB ram but the 
cloud server is high memory one 30GB RAM and 8 CPU. As far as I could see the 
execution happens much faster in google platform but somehow the job processing 
getting messed up.

Any suggestions?


Regards,
Vivek M



From: Vivek Meghanathan (WT01 - NEP)
Sent: 27 December 2015 11:08
To: Bryan 
Cc: Vivek Meghanathan (WT01 - NEP) ; 
duc.was.h...@gmail.com; user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue


Hi Bryan,
Yes we are using only 1 thread per topic as we have only one Kafka server with 
1 partition.
What kind of logs will tell us what offset spark stream is reading from Kafka 
or is it resetting something without reading?

Regards
Vivek


Sent using CloudMagic 
Email<https://cloudmagic.com/k/d/mailapp?ct=pa&cv=8.0.67&pv=5.1.1&source=email_footer_2>
On Sun, Dec 27, 2015 at 12:03 am, Bryan 
mailto:bryan.jeff...@gmail.com>> wrote:

Vivek,

Where you’re using numThreads – look at the documentation for createStream. I 
believe that number should be the number of partitions to consume.

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Sent: Friday, December 25, 2015 11:39 PM
To: bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>
Cc: duc.was.h...@gmail.com<mailto:duc.was.h...@gmail.com>; 
vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming + Kafka + scala job message read issue


Hi Brian,PhuDuc,

All 8 jobs are consuming 8 different IN topics. 8 different Scala jobs running 
each topic map mentioned below has only 1 thread number mentioned. In this case 
group should not be a problem right.

Here is the complete flow, spring MVC sends in messages to Kafka , spark 
streaming reading that and sends message back to Kafka, some cases they will 
update data to Cassandra only. Spring the response messages.
I could see the message is always reaching Kafka (checked through the console 
consumer).

Regards
Vivek


Sent using CloudMagic 
Email<https://cloudmagic.com/k/d/mailapp?ct=pa&cv=8.0.67&pv=5.1.1&source=email_footer_2>
On Sat, Dec 26, 2015 at 2:42 am, Bryan 
mailto:bryan.jeff...@gmail.com>> wrote:

Agreed. I did not see that they were using the same group name.

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: PhuDuc Nguyen<mailto:duc.was.h...@gmail.com>
Sent: Friday, December 25, 2015 3:35 PM
To: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Vivek,

Did you say you have 8 spark jobs that are consuming from the same topic and 
all jobs are using the same consumer group name? If so, each job would get a 
subset of messages from that kafka topic, ie each job would get 1 out of 8 
messages from that topic. Is that your intent?

regards,
Duc






On Thu, Dec 24, 2015 at 7:20 AM, 
mailto:vivek.meghanat...@wipro.com>> wrote:
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])


Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) 

RE: Spark Streaming + Kafka + scala job message read issue

2016-01-05 Thread vivek.meghanathan
Hello All,

After investigating further using a test program, we were able to read the 
kafka input messages using spark streaming.

Once we add a particular line which performs map and reduce - and groupByKey 
(all written in single line), we are not seeing the input message details in 
the logs. We have increased the batch interval to 5 seconds and removed the 
numtasks (it was defined as 10) . Once we made this change the kafka messages 
started to get processed . But it takes long time to process.

This works fine in our local lab server but the problem in the google compute 
engine server. The local lab server is low in spec 8 cpu with 8GB ram but the 
cloud server is high memory one 30GB RAM and 8 CPU. As far as I could see the 
execution happens much faster in google platform but somehow the job processing 
getting messed up.

Any suggestions?


Regards,
Vivek M



From: Vivek Meghanathan (WT01 - NEP)
Sent: 27 December 2015 11:08
To: Bryan 
Cc: Vivek Meghanathan (WT01 - NEP) ; 
duc.was.h...@gmail.com; user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue


Hi Bryan,
Yes we are using only 1 thread per topic as we have only one Kafka server with 
1 partition.
What kind of logs will tell us what offset spark stream is reading from Kafka 
or is it resetting something without reading?

Regards
Vivek


Sent using CloudMagic 
Email<https://cloudmagic.com/k/d/mailapp?ct=pa&cv=8.0.67&pv=5.1.1&source=email_footer_2>
On Sun, Dec 27, 2015 at 12:03 am, Bryan 
mailto:bryan.jeff...@gmail.com>> wrote:

Vivek,

Where you're using numThreads - look at the documentation for createStream. I 
believe that number should be the number of partitions to consume.

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Sent: Friday, December 25, 2015 11:39 PM
To: bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>
Cc: duc.was.h...@gmail.com<mailto:duc.was.h...@gmail.com>; 
vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming + Kafka + scala job message read issue


Hi Brian,PhuDuc,

All 8 jobs are consuming 8 different IN topics. 8 different Scala jobs running 
each topic map mentioned below has only 1 thread number mentioned. In this case 
group should not be a problem right.

Here is the complete flow, spring MVC sends in messages to Kafka , spark 
streaming reading that and sends message back to Kafka, some cases they will 
update data to Cassandra only. Spring the response messages.
I could see the message is always reaching Kafka (checked through the console 
consumer).

Regards
Vivek


Sent using CloudMagic 
Email<https://cloudmagic.com/k/d/mailapp?ct=pa&cv=8.0.67&pv=5.1.1&source=email_footer_2>
On Sat, Dec 26, 2015 at 2:42 am, Bryan 
mailto:bryan.jeff...@gmail.com>> wrote:

Agreed. I did not see that they were using the same group name.

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: PhuDuc Nguyen<mailto:duc.was.h...@gmail.com>
Sent: Friday, December 25, 2015 3:35 PM
To: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Vivek,

Did you say you have 8 spark jobs that are consuming from the same topic and 
all jobs are using the same consumer group name? If so, each job would get a 
subset of messages from that kafka topic, ie each job would get 1 out of 8 
messages from that topic. Is that your intent?

regards,
Duc






On Thu, Dec 24, 2015 at 7:20 AM, 
mailto:vivek.meghanat...@wipro.com>> wrote:
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name - 
is that a problem?

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])


Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) 
mailto:vivek.meghanat...@wipro.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Spark Streaming + Kafka + scala job message read issue

Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you've specified for your topic 
match the number of partitions in the topic on Kafka?

That would be an possible cause - as you might 

Re: Spark Streaming + Kafka + scala job message read issue

2015-12-26 Thread vivek.meghanathan
Hi Bryan,
Yes we are using only 1 thread per topic as we have only one Kafka server with 
1 partition.
What kind of logs will tell us what offset spark stream is reading from Kafka 
or is it resetting something without reading?

Regards
Vivek


Sent using CloudMagic 
Email<https://cloudmagic.com/k/d/mailapp?ct=pa&cv=8.0.67&pv=5.1.1&source=email_footer_2>
On Sun, Dec 27, 2015 at 12:03 am, Bryan 
mailto:bryan.jeff...@gmail.com>> wrote:

Vivek,

Where you’re using numThreads – look at the documentation for createStream. I 
believe that number should be the number of partitions to consume.

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Sent: Friday, December 25, 2015 11:39 PM
To: bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>
Cc: duc.was.h...@gmail.com<mailto:duc.was.h...@gmail.com>; 
vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming + Kafka + scala job message read issue


Hi Brian,PhuDuc,

All 8 jobs are consuming 8 different IN topics. 8 different Scala jobs running 
each topic map mentioned below has only 1 thread number mentioned. In this case 
group should not be a problem right.

Here is the complete flow, spring MVC sends in messages to Kafka , spark 
streaming reading that and sends message back to Kafka, some cases they will 
update data to Cassandra only. Spring the response messages.
I could see the message is always reaching Kafka (checked through the console 
consumer).

Regards
Vivek


Sent using CloudMagic 
Email<https://cloudmagic.com/k/d/mailapp?ct=pa&cv=8.0.67&pv=5.1.1&source=email_footer_2>
On Sat, Dec 26, 2015 at 2:42 am, Bryan 
mailto:bryan.jeff...@gmail.com>> wrote:

Agreed. I did not see that they were using the same group name.

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: PhuDuc Nguyen<mailto:duc.was.h...@gmail.com>
Sent: Friday, December 25, 2015 3:35 PM
To: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Vivek,

Did you say you have 8 spark jobs that are consuming from the same topic and 
all jobs are using the same consumer group name? If so, each job would get a 
subset of messages from that kafka topic, ie each job would get 1 out of 8 
messages from that topic. Is that your intent?

regards,
Duc






On Thu, Dec 24, 2015 at 7:20 AM, 
mailto:vivek.meghanat...@wipro.com>> wrote:
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])


Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) 
mailto:vivek.meghanat...@wipro.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Spark Streaming + Kafka + scala job message read issue

Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka?

That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.

Regards,

Bryan Jeffrey

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Spark Streaming + Kafka + scala job message read issue

Hi All,



We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.

Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our 

RE: Spark Streaming + Kafka + scala job message read issue

2015-12-26 Thread Bryan
Vivek,

Where you’re using numThreads – look at the documentation for createStream. I 
believe that number should be the number of partitions to consume.

Sent from Outlook Mail for Windows 10 phone


From: vivek.meghanat...@wipro.com
Sent: Friday, December 25, 2015 11:39 PM
To: bryan.jeff...@gmail.com
Cc: duc.was.h...@gmail.com; vivek.meghanat...@wipro.com; user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Hi Brian,PhuDuc,
All 8 jobs are consuming 8 different IN topics. 8 different Scala jobs running 
each topic map mentioned below has only 1 thread number mentioned. In this case 
group should not be a problem right.
Here is the complete flow, spring MVC sends in messages to Kafka , spark 
streaming reading that and sends message back to Kafka, some cases they will 
update data to Cassandra only. Spring the response messages.
I could see the message is always reaching Kafka (checked through the console 
consumer).
Regards
Vivek


Sent using CloudMagic Email 
On Sat, Dec 26, 2015 at 2:42 am, Bryan  wrote:

Agreed. I did not see that they were using the same group name.
 
Sent from Outlook Mail for Windows 10 phone
 

From: PhuDuc Nguyen
Sent: Friday, December 25, 2015 3:35 PM
To: vivek.meghanat...@wipro.com
Cc: user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue
 
Vivek,
 
Did you say you have 8 spark jobs that are consuming from the same topic and 
all jobs are using the same consumer group name? If so, each job would get a 
subset of messages from that kafka topic, ie each job would get 1 out of 8 
messages from that topic. Is that your intent? 
 
regards,
Duc
 
 
 
 
 
 
On Thu, Dec 24, 2015 at 7:20 AM,  wrote:
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?
 
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])
 
 
Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com] 
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) ; 
user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue
 
Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka? 
 
That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.
 
Regards,
 
Bryan Jeffrey
 
Sent from Outlook Mail for Windows 10 phone
 

From: vivek.meghanat...@wipro.com
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org
Subject: Spark Streaming + Kafka + scala job message read issue
 
Hi All,
 

We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.
Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers
Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com 
 
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liab

Re: Spark Streaming + Kafka + scala job message read issue

2015-12-25 Thread vivek.meghanathan
Hi Brian,PhuDuc,

All 8 jobs are consuming 8 different IN topics. 8 different Scala jobs running 
each topic map mentioned below has only 1 thread number mentioned. In this case 
group should not be a problem right.

Here is the complete flow, spring MVC sends in messages to Kafka , spark 
streaming reading that and sends message back to Kafka, some cases they will 
update data to Cassandra only. Spring the response messages.
I could see the message is always reaching Kafka (checked through the console 
consumer).

Regards
Vivek


Sent using CloudMagic 
Email<https://cloudmagic.com/k/d/mailapp?ct=pa&cv=8.0.67&pv=5.1.1&source=email_footer_2>
On Sat, Dec 26, 2015 at 2:42 am, Bryan 
mailto:bryan.jeff...@gmail.com>> wrote:

Agreed. I did not see that they were using the same group name.

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: PhuDuc Nguyen<mailto:duc.was.h...@gmail.com>
Sent: Friday, December 25, 2015 3:35 PM
To: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Vivek,

Did you say you have 8 spark jobs that are consuming from the same topic and 
all jobs are using the same consumer group name? If so, each job would get a 
subset of messages from that kafka topic, ie each job would get 1 out of 8 
messages from that topic. Is that your intent?

regards,
Duc






On Thu, Dec 24, 2015 at 7:20 AM, 
mailto:vivek.meghanat...@wipro.com>> wrote:
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])


Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) 
mailto:vivek.meghanat...@wipro.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Spark Streaming + Kafka + scala job message read issue

Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka?

That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.

Regards,

Bryan Jeffrey

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Spark Streaming + Kafka + scala job message read issue

Hi All,



We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.

Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers

Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com<http://www.wipro.com>

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Comp

RE: Spark Streaming + Kafka + scala job message read issue

2015-12-25 Thread Bryan
Agreed. I did not see that they were using the same group name.

Sent from Outlook Mail for Windows 10 phone


From: PhuDuc Nguyen
Sent: Friday, December 25, 2015 3:35 PM
To: vivek.meghanat...@wipro.com
Cc: user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Vivek,

Did you say you have 8 spark jobs that are consuming from the same topic and 
all jobs are using the same consumer group name? If so, each job would get a 
subset of messages from that kafka topic, ie each job would get 1 out of 8 
messages from that topic. Is that your intent? 

regards,
Duc






On Thu, Dec 24, 2015 at 7:20 AM,  wrote:
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?
 
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])
 
 
Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com] 
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) ; 
user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue
 
Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka? 
 
That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.
 
Regards,
 
Bryan Jeffrey
 
Sent from Outlook Mail for Windows 10 phone
 

From: vivek.meghanat...@wipro.com
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org
Subject: Spark Streaming + Kafka + scala job message read issue
 
Hi All,
 

We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.
Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers
Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com 
 
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com 




Re: Spark Streaming + Kafka + scala job message read issue

2015-12-25 Thread PhuDuc Nguyen
Vivek,

Did you say you have 8 spark jobs that are consuming from the same topic
and all jobs are using the same consumer group name? If so, each job would
get a subset of messages from that kafka topic, ie each job would get 1 out
of 8 messages from that topic. Is that your intent?

regards,
Duc






On Thu, Dec 24, 2015 at 7:20 AM,  wrote:

> We are using the older receiver based approach, the number of partitions
> is 1 (we have a single node kafka) and we use single thread per topic still
> we have the problem. Please see the API we use. All 8 spark jobs use same
> group name – is that a problem?
>
>
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - *Number
> of threads used here is 1*
>
> val searches = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(line => parse(line._2).extract[Search])
>
>
>
>
>
> Regards,
> Vivek M
>
> *From:* Bryan [mailto:bryan.jeff...@gmail.com]
> *Sent:* 24 December 2015 17:20
> *To:* Vivek Meghanathan (WT01 - NEP) ;
> user@spark.apache.org
> *Subject:* RE: Spark Streaming + Kafka + scala job message read issue
>
>
>
> Are you using a direct stream consumer, or the older receiver based
> consumer? If the latter, do the number of partitions you’ve specified for
> your topic match the number of partitions in the topic on Kafka?
>
>
>
> That would be an possible cause – as you might receive all data from a
> given partition while missing data from other partitions.
>
>
>
> Regards,
>
>
>
> Bryan Jeffrey
>
>
>
> Sent from Outlook Mail <http://go.microsoft.com/fwlink/?LinkId=550987>
> for Windows 10 phone
>
>
>
>
> *From: *vivek.meghanat...@wipro.com
> *Sent: *Thursday, December 24, 2015 5:22 AM
> *To: *user@spark.apache.org
> *Subject: *Spark Streaming + Kafka + scala job message read issue
>
>
>
> Hi All,
>
>
>
> We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform.
> Our spark streaming job(consumer) not receiving all the messages sent to
> the specific topic. It receives 1 out of ~50 messages(added log in the job
> stream and identified). We are not seeing any errors in the kafka logs.
> Unable to debug further from kafka layer. The console consumer shows the
> INPUT topic is received in the console. it is not reaching the spark-kafka
> integration stream. Any thoughts how to debug this issue. Another topic is
> working fine in same setup.
>
> Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue.
> All these jobs are working fine in our local lab servers
>
> Regards,
> Vivek M
>
> The information contained in this electronic message and any attachments
> to this message are intended for the exclusive use of the addressee(s) and
> may contain proprietary, confidential or privileged information. If you are
> not the intended recipient, you should not disseminate, distribute or copy
> this e-mail. Please notify the sender immediately and destroy all copies of
> this message and any attachments. WARNING: Computer viruses can be
> transmitted via email. The recipient should check this email and any
> attachments for the presence of viruses. The company accepts no liability
> for any damage caused by any virus transmitted by this email.
> www.wipro.com
>
>
> The information contained in this electronic message and any attachments
> to this message are intended for the exclusive use of the addressee(s) and
> may contain proprietary, confidential or privileged information. If you are
> not the intended recipient, you should not disseminate, distribute or copy
> this e-mail. Please notify the sender immediately and destroy all copies of
> this message and any attachments. WARNING: Computer viruses can be
> transmitted via email. The recipient should check this email and any
> attachments for the presence of viruses. The company accepts no liability
> for any damage caused by any virus transmitted by this email.
> www.wipro.com
>


RE: Spark Streaming + Kafka + scala job message read issue

2015-12-25 Thread Bryan
Vivek,

https://spark.apache.org/docs/1.5.2/streaming-kafka-integration.html

The map is per partitions number of topics to consume. Is numThreads below 
equal to the number of partitions in your topic?

Regards,

Bryan Jeffrey

Sent from Outlook Mail for Windows 10 phone


From: vivek.meghanat...@wipro.com
Sent: Friday, December 25, 2015 2:18 PM
To: bryan.jeff...@gmail.com; user@spark.apache.org
Subject: Re: Spark Streaming + Kafka + scala job message read issue

Any help is highly appreciated, i am completely stuck here..


From: Vivek Meghanathan (WT01 - NEP)
Sent: Thursday, December 24, 2015 7:50 PM
To: Bryan; user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue 
 
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?
 
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])
 
 
Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com] 
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) ; 
user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue
 
Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka? 
 
That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.
 
Regards,
 
Bryan Jeffrey
 
Sent from Outlook Mail for Windows 10 phone
 

From: vivek.meghanat...@wipro.com
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org
Subject: Spark Streaming + Kafka + scala job message read issue
 
Hi All,
 

We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.
Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers
Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com 
 
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark Streaming + Kafka + scala job message read issue

2015-12-25 Thread vivek.meghanathan
Any help is highly appreciated, i am completely stuck here..



From: Vivek Meghanathan (WT01 - NEP)
Sent: Thursday, December 24, 2015 7:50 PM
To: Bryan; user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue


We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name - 
is that a problem?



val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1

val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])





Regards,
Vivek M

From: Bryan [mailto:bryan.jeff...@gmail.com]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) ; 
user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue



Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you've specified for your topic 
match the number of partitions in the topic on Kafka?



That would be an possible cause - as you might receive all data from a given 
partition while missing data from other partitions.



Regards,



Bryan Jeffrey



Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone



From: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Spark Streaming + Kafka + scala job message read issue



Hi All,




We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.

Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers


Regards,
Vivek M

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com<http://www.wipro.com>



The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


RE: Spark Streaming + Kafka + scala job message read issue

2015-12-24 Thread vivek.meghanathan
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])


Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) ; 
user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue

Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka?

That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.

Regards,

Bryan Jeffrey

Sent from Outlook Mail<http://go.microsoft.com/fwlink/?LinkId=550987> for 
Windows 10 phone


From: vivek.meghanat...@wipro.com<mailto:vivek.meghanat...@wipro.com>
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Spark Streaming + Kafka + scala job message read issue

Hi All,



We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.

Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers

Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com<http://www.wipro.com>

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


RE: Spark Streaming + Kafka + scala job message read issue

2015-12-24 Thread Bryan
Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka? 

That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.

Regards,

Bryan Jeffrey

Sent from Outlook Mail for Windows 10 phone


From: vivek.meghanat...@wipro.com
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org
Subject: Spark Streaming + Kafka + scala job message read issue

Hi All,


We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.
Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers
Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com 



Spark Streaming + Kafka + scala job message read issue

2015-12-24 Thread vivek.meghanathan
Hi All,




We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.

Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers

Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Re: Need to maintain the consumer offset by myself when using spark streaming kafka direct approach?

2015-12-08 Thread Dibyendu Bhattacharya
In direct stream checkpoint location is not recoverable if you modify your
driver code. So if you just rely on checkpoint to commit offset , you can
possibly loose messages if you modify driver code and you select  offset
from "largest" offset. If you do not want to loose messages,  you need to
commit offset to external store in case of direct stream.

On Tue, Dec 8, 2015 at 7:47 PM, PhuDuc Nguyen 
wrote:

> Kafka Receiver-based approach:
> This will maintain the consumer offsets in ZK for you.
>
> Kafka Direct approach:
> You can use checkpointing and that will maintain consumer offsets for you.
> You'll want to checkpoint to a highly available file system like HDFS or S3.
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
>
> You don't have to maintain your own offsets if you don't want to. If the 2
> solutions above don't satisfy your requirements, then consider writing your
> own; otherwise I would recommend using the supported features in Spark.
>
> HTH,
> Duc
>
>
>
> On Tue, Dec 8, 2015 at 5:05 AM, Tao Li  wrote:
>
>> I am using spark streaming kafka direct approach these days. I found that
>> when I start the application, it always start consumer the latest offset. I
>> hope that when application start, it consume from the offset last
>> application consumes with the same kafka consumer group. It means I have to
>> maintain the consumer offset by my self, for example record it on
>> zookeeper, and reload the last offset from zookeeper when restarting the
>> applicaiton?
>>
>> I see the following discussion:
>> https://github.com/apache/spark/pull/4805
>> https://issues.apache.org/jira/browse/SPARK-6249
>>
>> Is there any conclusion? Do we need to maintain the offset by myself? Or
>> spark streaming will support a feature to simplify the offset maintain work?
>>
>>
>> https://forums.databricks.com/questions/2936/need-to-maintain-the-consumer-offset-by-myself-whe.html
>>
>
>


Re: Need to maintain the consumer offset by myself when using spark streaming kafka direct approach?

2015-12-08 Thread PhuDuc Nguyen
Kafka Receiver-based approach:
This will maintain the consumer offsets in ZK for you.

Kafka Direct approach:
You can use checkpointing and that will maintain consumer offsets for you.
You'll want to checkpoint to a highly available file system like HDFS or S3.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

You don't have to maintain your own offsets if you don't want to. If the 2
solutions above don't satisfy your requirements, then consider writing your
own; otherwise I would recommend using the supported features in Spark.

HTH,
Duc



On Tue, Dec 8, 2015 at 5:05 AM, Tao Li  wrote:

> I am using spark streaming kafka direct approach these days. I found that
> when I start the application, it always start consumer the latest offset. I
> hope that when application start, it consume from the offset last
> application consumes with the same kafka consumer group. It means I have to
> maintain the consumer offset by my self, for example record it on
> zookeeper, and reload the last offset from zookeeper when restarting the
> applicaiton?
>
> I see the following discussion:
> https://github.com/apache/spark/pull/4805
> https://issues.apache.org/jira/browse/SPARK-6249
>
> Is there any conclusion? Do we need to maintain the offset by myself? Or
> spark streaming will support a feature to simplify the offset maintain work?
>
>
> https://forums.databricks.com/questions/2936/need-to-maintain-the-consumer-offset-by-myself-whe.html
>


RE: Need to maintain the consumer offset by myself when using spark streaming kafka direct approach?

2015-12-08 Thread Singh, Abhijeet
You need to maintain the offset yourself and rightly so in something like 
ZooKeeper.

From: Tao Li [mailto:litao.bupt...@gmail.com]
Sent: Tuesday, December 08, 2015 5:36 PM
To: user@spark.apache.org
Subject: Need to maintain the consumer offset by myself when using spark 
streaming kafka direct approach?

I am using spark streaming kafka direct approach these days. I found that when 
I start the application, it always start consumer the latest offset. I hope 
that when application start, it consume from the offset last application 
consumes with the same kafka consumer group. It means I have to maintain the 
consumer offset by my self, for example record it on zookeeper, and reload the 
last offset from zookeeper when restarting the applicaiton?

I see the following discussion:
https://github.com/apache/spark/pull/4805
https://issues.apache.org/jira/browse/SPARK-6249

Is there any conclusion? Do we need to maintain the offset by myself? Or spark 
streaming will support a feature to simplify the offset maintain work?

https://forums.databricks.com/questions/2936/need-to-maintain-the-consumer-offset-by-myself-whe.html


Need to maintain the consumer offset by myself when using spark streaming kafka direct approach?

2015-12-08 Thread Tao Li
I am using spark streaming kafka direct approach these days. I found that
when I start the application, it always start consumer the latest offset. I
hope that when application start, it consume from the offset last
application consumes with the same kafka consumer group. It means I have to
maintain the consumer offset by my self, for example record it on
zookeeper, and reload the last offset from zookeeper when restarting the
applicaiton?

I see the following discussion:
https://github.com/apache/spark/pull/4805
https://issues.apache.org/jira/browse/SPARK-6249

Is there any conclusion? Do we need to maintain the offset by myself? Or
spark streaming will support a feature to simplify the offset maintain work?

https://forums.databricks.com/questions/2936/need-to-maintain-the-consumer-offset-by-myself-whe.html


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-03 Thread Cody Koeninger
com/dibbhatt/kafka-spark-consumer
>>>>>>>
>>>>>>>
>>>>>>> Dibyendu
>>>>>>>
>>>>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>>
>>>>>>>> How to avoid those Errors with receiver based approach? Suppose we
>>>>>>>> are OK with at least once processing and use receiver based approach 
>>>>>>>> which
>>>>>>>> uses ZooKeeper but not query Kafka directly, would these 
>>>>>>>> errors(Couldn't
>>>>>>>> find leader offsets for
>>>>>>>> Set([test_stream,5])))be avoided?
>>>>>>>>
>>>>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> KafkaRDD.scala , handleFetchErr
>>>>>>>>>
>>>>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Cody,
>>>>>>>>>>
>>>>>>>>>> How to look at Option 2(see the following)? Which portion of the
>>>>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific 
>>>>>>>>>> to our
>>>>>>>>>> requirements.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for
>>>>>>>>>> that
>>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>>> in the
>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>
>>>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <
>>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> If you're consistently getting offset out of range exceptions,
>>>>>>>>>>> it's probably because messages are getting deleted before you've 
>>>>>>>>>>> processed
>>>>>>>>>>> them.
>>>>>>>>>>>
>>>>>>>>>>> The only real way to deal with this is give kafka more
>>>>>>>>>>> retention, consume faster, or both.
>>>>>>>>>>>
>>>>>>>>>>> If you're just looking for a quick "fix" for an infrequent
>>>>>>>>>>> issue, option 4 is probably easiest.  I wouldn't do that 
>>>>>>>>>>> automatically /
>>>>>>>>>>> silently, because you're losing data.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> So, our Streaming Job fails with the following errors. If you
>>>>>>>>>>>> see the errors
>>>>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>>>>
>>>>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>>>>> like to do
>>>>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>>>>> Spark Kafka
>>>>>>>>>>>> Direct?
>>>>>>>>>>>>
>>>>>>>>>>>> 1.Need to see a way to skip some offsets if they are not
>>>>>>>>>>>> available after the
>>>>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>>>>
>>>>>>

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
kaRDD.scala , handleFetchErr
>>>>>>>>
>>>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Cody,
>>>>>>>>>
>>>>>>>>> How to look at Option 2(see the following)? Which portion of the
>>>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific 
>>>>>>>>> to our
>>>>>>>>> requirements.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>> in the
>>>>>>>>> backlog (if there is one)?
>>>>>>>>>
>>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger >>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> If you're consistently getting offset out of range exceptions,
>>>>>>>>>> it's probably because messages are getting deleted before you've 
>>>>>>>>>> processed
>>>>>>>>>> them.
>>>>>>>>>>
>>>>>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>>>>>> consume faster, or both.
>>>>>>>>>>
>>>>>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>>>>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>>>>>>>>> silently,
>>>>>>>>>> because you're losing data.
>>>>>>>>>>
>>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> So, our Streaming Job fails with the following errors. If you
>>>>>>>>>>> see the errors
>>>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>>>
>>>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>>>> like to do
>>>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>>>> Spark Kafka
>>>>>>>>>>> Direct?
>>>>>>>>>>>
>>>>>>>>>>> 1.Need to see a way to skip some offsets if they are not
>>>>>>>>>>> available after the
>>>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>>>
>>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for
>>>>>>>>>>> that
>>>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>>>> in the
>>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>>
>>>>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>>>>> offsets.
>>>>>>>>>>>
>>>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>>>> this error,
>>>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>>>> checkpoint
>>>>>>>>>>> directory, and restart.
>>>>>>>>>>>
>>>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>>>> [Terminated,
>>>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>>>> tasks =
>>>>>>>>>>> 12112]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>> failure: Task 10
>>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task
>>>>>>>>>>> 10.3 in stage
>>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>>>
>>>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>> failure: Task 7 in
>>>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>>>>> stage 33.0
>>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -
>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
gt;>> partition And how would it handle the offsets already calculated in
>>>>>>>> the
>>>>>>>> backlog (if there is one)?
>>>>>>>>
>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If you're consistently getting offset out of range exceptions,
>>>>>>>>> it's probably because messages are getting deleted before you've 
>>>>>>>>> processed
>>>>>>>>> them.
>>>>>>>>>
>>>>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>>>>> consume faster, or both.
>>>>>>>>>
>>>>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>>>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>>>>>>>> silently,
>>>>>>>>> because you're losing data.
>>>>>>>>>
>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> So, our Streaming Job fails with the following errors. If you see
>>>>>>>>>> the errors
>>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>>
>>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>>> like to do
>>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>>> Spark Kafka
>>>>>>>>>> Direct?
>>>>>>>>>>
>>>>>>>>>> 1.Need to see a way to skip some offsets if they are not
>>>>>>>>>> available after the
>>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>>
>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for
>>>>>>>>>> that
>>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>>> in the
>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>
>>>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>>>> offsets.
>>>>>>>>>>
>>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>>> this error,
>>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>>> checkpoint
>>>>>>>>>> directory, and restart.
>>>>>>>>>>
>>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>>> [Terminated,
>>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>>> tasks =
>>>>>>>>>> 12112]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>> failure: Task 10
>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3
>>>>>>>>>> in stage
>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>>
>>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>> failure: Task 7 in
>>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>>>> stage 33.0
>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
x27;t do that automatically / 
>>>>>>>> silently,
>>>>>>>> because you're losing data.
>>>>>>>>
>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> So, our Streaming Job fails with the following errors. If you see
>>>>>>>>> the errors
>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>
>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>> like to do
>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>> Spark Kafka
>>>>>>>>> Direct?
>>>>>>>>>
>>>>>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>>>>>> after the
>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>
>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>> in the
>>>>>>>>> backlog (if there is one)?
>>>>>>>>>
>>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>>> offsets.
>>>>>>>>>
>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>> this error,
>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>> checkpoint
>>>>>>>>> directory, and restart.
>>>>>>>>>
>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>> [Terminated,
>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>> tasks =
>>>>>>>>> 12112]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>>> Task 10
>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3
>>>>>>>>> in stage
>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>
>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>>> Task 7 in
>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>>> stage 33.0
>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context:
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -
>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
;>>>>>> Kafka
>>>>>>>> Direct?
>>>>>>>>
>>>>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>>>>> after the
>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>
>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>>> the
>>>>>>>> backlog (if there is one)?
>>>>>>>>
>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>> offsets.
>>>>>>>>
>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>> this error,
>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>> checkpoint
>>>>>>>> directory, and restart.
>>>>>>>>
>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>> Set([test_stream,5]))
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>> [Terminated,
>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>> tasks =
>>>>>>>> 12112]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>> Task 10
>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3
>>>>>>>> in stage
>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>
>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>> java.lang.InterruptedException
>>>>>>>>
>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>> Task 7 in
>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>> stage 33.0
>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
gt; and if it occurs more than X times, kill the job, remove the
>>>>>>> checkpoint
>>>>>>> directory, and restart.
>>>>>>>
>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>> Set([test_stream,5]))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>
>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>> [Terminated,
>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks
>>>>>>> =
>>>>>>> 12112]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 10
>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>>>>> stage
>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>
>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>> java.lang.InterruptedException
>>>>>>>
>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 7 in
>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>> stage 33.0
>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
Exception
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>>
>>>>>>
>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>
>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>> [Terminated,
>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>>>>> 12112]
>>>>>>
>>>>>>
>>>>>>
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 10
>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>>>> stage
>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>
>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>> java.lang.InterruptedException
>>>>>>
>>>>>> Caused by: java.lang.InterruptedException
>>>>>>
>>>>>> java.lang.ClassNotFoundException:
>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>>
>>>>>>
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 7 in
>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>> stage 33.0
>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>
>>>>>> java.lang.ClassNotFoundException:
>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>> java.lang.ClassNotFoundException:
>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Dibyendu Bhattacharya
Hi, if you use Receiver based consumer which is available in spark-packages
( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
has all built in failure recovery and it can recover from any Kafka leader
changes and offset out of ranges issue.

Here is the package form github :
https://github.com/dibbhatt/kafka-spark-consumer


Dibyendu

On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy 
wrote:

> How to avoid those Errors with receiver based approach? Suppose we are OK
> with at least once processing and use receiver based approach which uses
> ZooKeeper but not query Kafka directly, would these errors(Couldn't find
> leader offsets for
> Set([test_stream,5])))be avoided?
>
> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger  wrote:
>
>> KafkaRDD.scala , handleFetchErr
>>
>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Cody,
>>>
>>> How to look at Option 2(see the following)? Which portion of the code in
>>> Spark Kafka Direct to look at to handle this issue specific to our
>>> requirements.
>>>
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>> wrote:
>>>
>>>> If you're consistently getting offset out of range exceptions, it's
>>>> probably because messages are getting deleted before you've processed them.
>>>>
>>>> The only real way to deal with this is give kafka more retention,
>>>> consume faster, or both.
>>>>
>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>> option 4 is probably easiest.  I wouldn't do that automatically / silently,
>>>> because you're losing data.
>>>>
>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> So, our Streaming Job fails with the following errors. If you see the
>>>>> errors
>>>>> below, they are all related to Kafka losing offsets and
>>>>> OffsetOutOfRangeException.
>>>>>
>>>>> What are the options we have other than fixing Kafka? We would like to
>>>>> do
>>>>> something like the following. How can we achieve 1 and 2 with Spark
>>>>> Kafka
>>>>> Direct?
>>>>>
>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>> after the
>>>>> max retries are reached..in that case there might be data loss.
>>>>>
>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>> partition And how would it handle the offsets already calculated in the
>>>>> backlog (if there is one)?
>>>>>
>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>> offsets.
>>>>>
>>>>> 4.Or a straightforward approach would be to monitor the log for this
>>>>> error,
>>>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>>>> directory, and restart.
>>>>>
>>>>> ERROR DirectKafkaInputDStream:
>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>> Set([test_stream,5]))
>>>>>
>>>>>
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.NotLeaderForPartitionException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>>
>>>>>
>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>
>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>> [Terminated,
>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>>>> 12112]
>>>>>
>>>>>
>>>>>
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 10
>>>>> in stage 52.0 failed 4 times

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
How to avoid those Errors with receiver based approach? Suppose we are OK
with at least once processing and use receiver based approach which uses
ZooKeeper but not query Kafka directly, would these errors(Couldn't find
leader offsets for
Set([test_stream,5])))be avoided?

On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger  wrote:

> KafkaRDD.scala , handleFetchErr
>
> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> How to look at Option 2(see the following)? Which portion of the code in
>> Spark Kafka Direct to look at to handle this issue specific to our
>> requirements.
>>
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>> wrote:
>>
>>> If you're consistently getting offset out of range exceptions, it's
>>> probably because messages are getting deleted before you've processed them.
>>>
>>> The only real way to deal with this is give kafka more retention,
>>> consume faster, or both.
>>>
>>> If you're just looking for a quick "fix" for an infrequent issue, option
>>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>>> because you're losing data.
>>>
>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>>>
>>>> Hi,
>>>>
>>>> So, our Streaming Job fails with the following errors. If you see the
>>>> errors
>>>> below, they are all related to Kafka losing offsets and
>>>> OffsetOutOfRangeException.
>>>>
>>>> What are the options we have other than fixing Kafka? We would like to
>>>> do
>>>> something like the following. How can we achieve 1 and 2 with Spark
>>>> Kafka
>>>> Direct?
>>>>
>>>> 1.Need to see a way to skip some offsets if they are not available
>>>> after the
>>>> max retries are reached..in that case there might be data loss.
>>>>
>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>> partition And how would it handle the offsets already calculated in the
>>>> backlog (if there is one)?
>>>>
>>>> 3.Track the offsets separately, restart the job by providing the
>>>> offsets.
>>>>
>>>> 4.Or a straightforward approach would be to monitor the log for this
>>>> error,
>>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>>> directory, and restart.
>>>>
>>>> ERROR DirectKafkaInputDStream:
>>>> ArrayBuffer(kafka.common.UnknownException,
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([test_stream,5]))
>>>>
>>>>
>>>>
>>>> java.lang.ClassNotFoundException:
>>>> kafka.common.NotLeaderForPartitionException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>
>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>> [Terminated,
>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>>> 12112]
>>>>
>>>>
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 10
>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>> stage
>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>
>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>> java.lang.InterruptedException
>>>>
>>>> Caused by: java.lang.InterruptedException
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 7 in
>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>>> 33.0
>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Cody Koeninger
KafkaRDD.scala , handleFetchErr

On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy 
wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code in
> Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger  wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed them.
>>
>> The only real way to deal with this is give kafka more retention, consume
>> faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue, option
>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see the
>>> errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like to do
>>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available after
>>> the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 7 in
>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>> 33.0
>>> (TID 283, 172.16.97.103): UnknownReason
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Following is the Option 2 that I was talking about:

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 1:39 PM, swetha kasireddy 
wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code in
> Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger  wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed them.
>>
>> The only real way to deal with this is give kafka more retention, consume
>> faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue, option
>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see the
>>> errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like to do
>>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available after
>>> the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 7 in
>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>> 33.0
>>> (TID 283, 172.16.97.103): UnknownReason
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Hi Cody,

How to look at Option 2(see the following)? Which portion of the code in
Spark Kafka Direct to look at to handle this issue specific to our
requirements.


2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger  wrote:

> If you're consistently getting offset out of range exceptions, it's
> probably because messages are getting deleted before you've processed them.
>
> The only real way to deal with this is give kafka more retention, consume
> faster, or both.
>
> If you're just looking for a quick "fix" for an infrequent issue, option 4
> is probably easiest.  I wouldn't do that automatically / silently, because
> you're losing data.
>
> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>
>> Hi,
>>
>> So, our Streaming Job fails with the following errors. If you see the
>> errors
>> below, they are all related to Kafka losing offsets and
>> OffsetOutOfRangeException.
>>
>> What are the options we have other than fixing Kafka? We would like to do
>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>> Direct?
>>
>> 1.Need to see a way to skip some offsets if they are not available after
>> the
>> max retries are reached..in that case there might be data loss.
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> 3.Track the offsets separately, restart the job by providing the offsets.
>>
>> 4.Or a straightforward approach would be to monitor the log for this
>> error,
>> and if it occurs more than X times, kill the job, remove the checkpoint
>> directory, and restart.
>>
>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([test_stream,5]))
>>
>>
>>
>> java.lang.ClassNotFoundException:
>> kafka.common.NotLeaderForPartitionException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> java.util.concurrent.RejectedExecutionException: Task
>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>> [Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>> 12112]
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>
>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>> java.lang.InterruptedException
>>
>> Caused by: java.lang.InterruptedException
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
>> in
>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 33.0
>> (TID 283, 172.16.97.103): UnknownReason
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


  1   2   >