[jira] [Created] (SPARK-30570) Update scalafmt to 1.0.3 with onlyChangedFiles feature

2020-01-19 Thread Cody Koeninger (Jira)
Cody Koeninger created SPARK-30570:
--

 Summary: Update scalafmt to 1.0.3 with onlyChangedFiles feature
 Key: SPARK-30570
 URL: https://issues.apache.org/jira/browse/SPARK-30570
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 3.0.0
Reporter: Cody Koeninger


[https://github.com/SimonJPegg/mvn_scalafmt/releases/tag/v1.0.3]

added an option onlyChangedFiles which was one of the things holding back the 
upgrade in SPARK-29293



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26177) Automated formatting for Scala code

2018-11-26 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-26177:
--

 Summary: Automated formatting for Scala code
 Key: SPARK-26177
 URL: https://issues.apache.org/jira/browse/SPARK-26177
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 3.0.0
Reporter: Cody Koeninger


Provide an easy way for contributors to run scalafmt only on files that differ 
from git master

See discussion at

[http://apache-spark-developers-list.1001551.n3.nabble.com/Automated-formatting-td25791.html]

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26121) [Structured Streaming] Allow users to define prefix of Kafka's consumer group (group.id)

2018-11-26 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-26121.

   Resolution: Fixed
 Assignee: Anastasios Zouzias
Fix Version/s: 3.0.0

resolved by pr https://github.com/apache/spark/pull/23103

> [Structured Streaming] Allow users to define prefix of Kafka's consumer group 
> (group.id)
> 
>
> Key: SPARK-26121
> URL: https://issues.apache.org/jira/browse/SPARK-26121
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Anastasios Zouzias
>Assignee: Anastasios Zouzias
>Priority: Minor
> Fix For: 3.0.0
>
>
> I run in the following situation with Spark Structure Streaming (SS) using 
> Kafka.
>  
> In a project that I work on, there is already a secured Kafka setup where ops 
> can issue an SSL certificate per "[group.id|http://group.id/];, which should 
> be predefined (or its prefix to be predefined).
>  
> On the other hand, Spark SS fixes the [group.id|http://group.id/] to 
>  
> val uniqueGroupId = 
> s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
>  
> see, i.e.,
>  
> [https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L124]
>  
> https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81
>  
> I guess Spark developers had a good reason to fix it, but is it possible to 
> make configurable the prefix of the above uniqueGroupId 
> ("spark-kafka-source")?
>  
> The rational is that spark users are not forced to use the same certificate 
> on group-ids of the form (spark-kafka-source-*).
>  
> DoD:
> * Allow spark SS users to define the group.id prefix as input parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25983) spark-sql-kafka-0-10 no longer works with Kafka 0.10.0

2018-11-10 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16682464#comment-16682464
 ] 

Cody Koeninger commented on SPARK-25983:


Looks like we need an equivalent warning in 
http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#linking
Feel free to submit a PR, and ask me if you need help

> spark-sql-kafka-0-10 no longer works with Kafka 0.10.0
> --
>
> Key: SPARK-25983
> URL: https://issues.apache.org/jira/browse/SPARK-25983
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Alexander Bessonov
>Priority: Minor
>
> Package {{org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0}} is no longer 
> compatible with {{org.apache.kafka:kafka_2.11:0.10.0.1}}.
> When both packages are used in the same project, the following exception 
> occurs:
> {code:java}
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/protocol/SecurityProtocol
>  at kafka.server.Defaults$.(KafkaConfig.scala:125)
>  at kafka.server.Defaults$.(KafkaConfig.scala)
>  at kafka.log.Defaults$.(LogConfig.scala:33)
>  at kafka.log.Defaults$.(LogConfig.scala)
>  at kafka.log.LogConfig$.(LogConfig.scala:152)
>  at kafka.log.LogConfig$.(LogConfig.scala)
>  at kafka.server.KafkaConfig$.(KafkaConfig.scala:265)
>  at kafka.server.KafkaConfig$.(KafkaConfig.scala)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:759)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:761)
> {code}
>  
> This exception is caused by incompatible dependency pulled by Spark: 
> {{org.apache.kafka:kafka-clients_2.11:2.0.0}}.
>  
> Following workaround could be used to resolve the problem in my project:
> {code:java}
> dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "0.10.0.1"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25983) spark-sql-kafka-0-10 no longer works with Kafka 0.10.0

2018-11-10 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16682461#comment-16682461
 ] 

Cody Koeninger commented on SPARK-25983:


Documentation already explicitly says not to link to kafka-clients

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#linking

> spark-sql-kafka-0-10 no longer works with Kafka 0.10.0
> --
>
> Key: SPARK-25983
> URL: https://issues.apache.org/jira/browse/SPARK-25983
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Alexander Bessonov
>Priority: Minor
>
> Package {{org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0}} is no longer 
> compatible with {{org.apache.kafka:kafka_2.11:0.10.0.1}}.
> When both packages are used in the same project, the following exception 
> occurs:
> {code:java}
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/protocol/SecurityProtocol
>  at kafka.server.Defaults$.(KafkaConfig.scala:125)
>  at kafka.server.Defaults$.(KafkaConfig.scala)
>  at kafka.log.Defaults$.(LogConfig.scala:33)
>  at kafka.log.Defaults$.(LogConfig.scala)
>  at kafka.log.LogConfig$.(LogConfig.scala:152)
>  at kafka.log.LogConfig$.(LogConfig.scala)
>  at kafka.server.KafkaConfig$.(KafkaConfig.scala:265)
>  at kafka.server.KafkaConfig$.(KafkaConfig.scala)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:759)
>  at kafka.server.KafkaConfig.(KafkaConfig.scala:761)
> {code}
>  
> This exception is caused by incompatible dependency pulled by Spark: 
> {{org.apache.kafka:kafka-clients_2.11:2.0.0}}.
>  
> Following workaround could be used to resolve the problem in my project:
> {code:java}
> dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "0.10.0.1"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25233) Give the user the option of specifying a fixed minimum message per partition per batch when using kafka direct API with backpressure

2018-08-30 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-25233.

   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 3
[https://github.com/apache/spark/pull/3]

> Give the user the option of specifying a fixed minimum message per partition 
> per batch when using kafka direct API with backpressure
> 
>
> Key: SPARK-25233
> URL: https://issues.apache.org/jira/browse/SPARK-25233
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.0
>Reporter: Reza Safi
>Assignee: Reza Safi
>Priority: Major
> Fix For: 2.4.0
>
>
> After SPARK-18371, it is guaranteed that there would be at least *one* 
> message per partition per batch using direct kafka API with backpressure when 
> new messages exist in the topics. It would be better if the user has the 
> option of setting the minimum instead of just a hard coded 1 limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25233) Give the user the option of specifying a fixed minimum message per partition per batch when using kafka direct API with backpressure

2018-08-30 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-25233:
--

Assignee: Reza Safi

> Give the user the option of specifying a fixed minimum message per partition 
> per batch when using kafka direct API with backpressure
> 
>
> Key: SPARK-25233
> URL: https://issues.apache.org/jira/browse/SPARK-25233
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.4.0
>Reporter: Reza Safi
>Assignee: Reza Safi
>Priority: Major
>
> After SPARK-18371, it is guaranteed that there would be at least *one* 
> message per partition per batch using direct kafka API with backpressure when 
> new messages exist in the topics. It would be better if the user has the 
> option of setting the minimum instead of just a hard coded 1 limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors

2018-08-05 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569491#comment-16569491
 ] 

Cody Koeninger commented on SPARK-24987:


It was merged to branch-2.3

https://github.com/apache/spark/commits/branch-2.3



> Kafka Cached Consumer Leaking File Descriptors
> --
>
> Key: SPARK-24987
> URL: https://issues.apache.org/jira/browse/SPARK-24987
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>Reporter: Yuval Itzchakov
>Assignee: Yuval Itzchakov
>Priority: Critical
> Fix For: 2.4.0
>
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new 
> consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in 
> use for some reason. The weird thing is that you can see this for very old 
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25026) Binary releases don't contain Kafka integration modules

2018-08-04 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569283#comment-16569283
 ] 

Cody Koeninger commented on SPARK-25026:


I don't think distributions have ever included the stuff under external/

Quick look back at 1.6 and 2.1 distributions shows they didn't have kafka (or 
flume, or kinesis, etc)

People typically just add those to their app's assembly jars afaik

 

> Binary releases don't contain Kafka integration modules
> ---
>
> Key: SPARK-25026
> URL: https://issues.apache.org/jira/browse/SPARK-25026
> Project: Spark
>  Issue Type: Bug
>  Components: Build, Structured Streaming
>Affects Versions: 2.2.2, 2.3.1, 2.4.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Blocker
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors

2018-08-04 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-24987.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Kafka Cached Consumer Leaking File Descriptors
> --
>
> Key: SPARK-24987
> URL: https://issues.apache.org/jira/browse/SPARK-24987
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>Reporter: Yuval Itzchakov
>Assignee: Yuval Itzchakov
>Priority: Critical
> Fix For: 2.4.0
>
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new 
> consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in 
> use for some reason. The weird thing is that you can see this for very old 
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors

2018-08-04 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-24987:
--

Assignee: Yuval Itzchakov

> Kafka Cached Consumer Leaking File Descriptors
> --
>
> Key: SPARK-24987
> URL: https://issues.apache.org/jira/browse/SPARK-24987
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>Reporter: Yuval Itzchakov
>Assignee: Yuval Itzchakov
>Priority: Critical
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new 
> consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in 
> use for some reason. The weird thing is that you can see this for very old 
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24713) AppMatser of spark streaming kafka OOM if there are hundreds of topics consumed

2018-07-13 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-24713.

   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21690
[https://github.com/apache/spark/pull/21690]

> AppMatser of spark streaming kafka OOM if there are hundreds of topics 
> consumed
> ---
>
> Key: SPARK-24713
> URL: https://issues.apache.org/jira/browse/SPARK-24713
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.3.1
>Reporter: Yuanbo Liu
>Assignee: Yuanbo Liu
>Priority: Major
> Fix For: 2.4.0
>
>
> We have hundreds of kafka topics need to be consumed in one application. The 
> application master will throw OOM exception after hanging for nearly half of 
> an hour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24713) AppMatser of spark streaming kafka OOM if there are hundreds of topics consumed

2018-07-13 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-24713:
--

Assignee: Yuanbo Liu

> AppMatser of spark streaming kafka OOM if there are hundreds of topics 
> consumed
> ---
>
> Key: SPARK-24713
> URL: https://issues.apache.org/jira/browse/SPARK-24713
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.3.1
>Reporter: Yuanbo Liu
>Assignee: Yuanbo Liu
>Priority: Major
> Fix For: 2.4.0
>
>
> We have hundreds of kafka topics need to be consumed in one application. The 
> application master will throw OOM exception after hanging for nearly half of 
> an hour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19680) Offsets out of range with no configured reset policy for partitions

2018-07-11 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540138#comment-16540138
 ] 

Cody Koeninger commented on SPARK-19680:


A new consumer group is the easiest thing to do, but if the only place
you're storing offsets is in kafka, using a normal consumer with the
same group id and seeking to whatever positions you want, then
committing, should also work.

Executor consumers shouldn't be storing offsets anywhere, they're just
there to do what the driver tells them to.



> Offsets out of range with no configured reset policy for partitions
> ---
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Schakmann Rene
>Priority: Major
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>"auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>   Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
> Map(
>   "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>   "group.id" -> properties.getProperty("kafka.consumer.group"),
>   "auto.offset.reset" -> "earliest",
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "enable.auto.commit" -> "false",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>   val topList = new TopList
>   topList.setCreated(new Date())
>   topList.setTopListEntryList(rdd.take(TopListLength).toList)
>   CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>   kafkaSink.value.send(SendToTopicName, topList)
>   CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>   .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>   .map(row => {
> val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
>   })
>   .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
> Minutes(windowDuration),
> Seconds(slideDuration))
>   .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>   .map(row => (row._2.getSearchCount, row._2))
>   .transform(rdd => rdd.sortByKey(ascending = false))
>   .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
> val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
> val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
> val kafkaParams: Map[String, Object] = 
> SparkUtil.getDefaultKafkaReceiverParameter(properties)
> val ssc = new 

[jira] [Commented] (SPARK-24720) kafka transaction creates Non-consecutive Offsets (due to transaction offset) making streaming fail when failOnDataLoss=true

2018-07-06 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534797#comment-16534797
 ] 

Cody Koeninger commented on SPARK-24720:


What's your plan to tell the difference between gaps due to transactions and 
gaps due to data loss?

> kafka transaction creates Non-consecutive Offsets (due to transaction offset) 
> making streaming fail when failOnDataLoss=true
> 
>
> Key: SPARK-24720
> URL: https://issues.apache.org/jira/browse/SPARK-24720
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Quentin Ambard
>Priority: Major
>
> When kafka transactions are used, sending 1 message to kafka will result to 1 
> offset for the data + 1 offset to mark the transaction.
> When kafka connector for spark streaming read a topic with non-consecutive 
> offset, it leads to a failure. SPARK-17147 fixed this issue for compacted 
> topics.
>  However, SPARK-17147 doesn't fix this issue for kafka transactions: if 1 
> message + 1 transaction commit are in a partition, spark will try to read 
> offsets  [0 2[. offset 0 (containing the message) will be read, but offset 1 
> won't return a value and buffer.hasNext() will be false even after a poll 
> since no data are present for offset 1 (it's the transaction commit)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24743) Update the JavaDirectKafkaWordCount example to support the new API of Kafka

2018-07-05 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-24743.

   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21717
[https://github.com/apache/spark/pull/21717]

> Update the JavaDirectKafkaWordCount example to support the new API of Kafka
> ---
>
> Key: SPARK-24743
> URL: https://issues.apache.org/jira/browse/SPARK-24743
> Project: Spark
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 2.3.1
>Reporter: luochuan
>Assignee: luochuan
>Priority: Minor
> Fix For: 2.4.0
>
>
> when I ran the example JavaDirectKafkaWordCount as follows:
> ./bin/spark-submit --class 
> org.apache.spark.examples.streaming.JavaDirectKafkaWordCount --master local 
> --jars /somepath/spark-streaming-kafka-0-10-assembly_2.11-2.3.1.jar 
> examples/jars/spark-examples_2.11-2.3.1.jar kafka-broker:port topic
> Then a error happened: 
> 'org.apache.kafka.common.config.ConfigException:Missing required 
> configuration "bootstrap.servers" which has no default value'. So I looked 
> into the code and found the example JavaDirectKafkaWordCount  misses some 
> kafka required configs.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18258) Sinks need access to offset representation

2018-06-27 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525137#comment-16525137
 ] 

Cody Koeninger commented on SPARK-18258:


[~Yohan123] This ticket is about giving implementors of addBatch access to 
actual offsets, not about what any particular implementation ends up doing with 
them.  A database sink is the use case I had in mind.  For a Kafka sink, it 
might be that putting additional information in the value was the only 
realistic option, but that's a use case specific concern.

If I misunderstood your question let me know.

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for 
> this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
> OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using 
> committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back 
> to the correct source when restarting complicated multi-source jobs, but I 
> think it'd be sufficient.  Passing the string/json representation of the seq 
> instead of the seq itself would probably be sufficient as well, but the 
> convention of rendering a None as "-" in the json is maybe a little 
> idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24507) Description in "Level of Parallelism in Data Receiving" section of Spark Streaming Programming Guide in is not relevan for the recent Kafka direct apprach

2018-06-20 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518287#comment-16518287
 ] 

Cody Koeninger commented on SPARK-24507:


You're welcome to submit a doc PR that clarifies that section is referring to 
receiver based implementations, and link to the kafka integration guide for the 
direct stream.

 

I don't think it makes sense to remove it altogether, as there are still 
multiple other implementations that extend Receiver.

> Description in "Level of Parallelism in Data Receiving" section of Spark 
> Streaming Programming Guide in is not relevan for the recent Kafka direct 
> apprach 
> ---
>
> Key: SPARK-24507
> URL: https://issues.apache.org/jira/browse/SPARK-24507
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, DStreams
>Affects Versions: 2.3.1
>Reporter: Lev Greenberg
>Priority: Minor
>
> [https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#level-of-parallelism-in-data-receiving]
>  describes deprecated "Receiver-based approach" e.g.,:
> _"... each input DStream creates a single receiver (running on a worker 
> machine) that receives a single stream of data"._
> _The first paragraph of 
> [level-of-parallelism-in-data-receiving|https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#level-of-parallelism-in-data-receiving]
>  section including the following code example should be removed._



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 1.1.0

2018-05-31 Thread Cody Koeninger (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496633#comment-16496633
 ] 

Cody Koeninger commented on SPARK-18057:


I'd just modify KafkaTestUtils to match the way things were
reorganized in the Kafka project.



> Update structured streaming kafka from 0.10.0.1 to 1.1.0
> 
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-05-14 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16474549#comment-16474549
 ] 

Cody Koeninger commented on SPARK-24067:


[~zsxwing] even in situations where users weren't enabling compaction, kafka is 
producing non-consecutive offsets, meaning this is a bug that prevents 
previously working jobs from being able to complete.

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
> Fix For: 2.3.1
>
>
> SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The  [PR 
> w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This 
> should be backported to 2.3.
>  
> Original Description from SPARK-17147 :
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-05-11 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-24067.

   Resolution: Fixed
Fix Version/s: 2.3.1

Issue resolved by pull request 21300
[https://github.com/apache/spark/pull/21300]

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
> Fix For: 2.3.1
>
>
> SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The  [PR 
> w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This 
> should be backported to 2.3.
>  
> Original Description from SPARK-17147 :
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-04-25 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16452603#comment-16452603
 ] 

Cody Koeninger commented on SPARK-24067:


The original PR [https://github.com/apache/spark/pull/20572] merges cleanly 
against 2.3, are you ok with merging it?

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
>
> SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The  [PR 
> w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This 
> should be backported to 2.3.
>  
> Original Description from SPARK-17147 :
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24067) Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction))

2018-04-25 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16452337#comment-16452337
 ] 

Cody Koeninger commented on SPARK-24067:


Given the response on the dev list about criteria for backporting, I think this 
is in a grey area.

In one sense, it's a new option.  But there have been reports of 
non-consecutive offsets even in normal non-compacted topics, which totally 
break jobs, so this could be seen as a critical bug.

I'd lean towards merging it to the 2.3 branch, but because I'm the one who 
wrote the code, I'm not comfortable making that call on my own. 

[~srowen] you merged the original PR, do you want to weigh in?

> Backport SPARK-17147 to 2.3 (Spark Streaming Kafka 0.10 Consumer Can't Handle 
> Non-consecutive Offsets (i.e. Log Compaction))
> 
>
> Key: SPARK-24067
> URL: https://issues.apache.org/jira/browse/SPARK-24067
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.3.0
>Reporter: Joachim Hereth
>Assignee: Cody Koeninger
>Priority: Major
>
> SPARK-17147 fixes a problem with non-consecutive Kafka Offsets. The  [PR 
> w|https://github.com/apache/spark/pull/20572]as merged to {{master}}. This 
> should be backported to 2.3.
>  
> Original Description from SPARK-17147 :
>  
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
>  {{nextOffset = offset + 1}}
>  to:
>  {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
>  {{requestOffset += 1}}
>  to:
>  {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-21168) KafkaRDD should always set kafka clientId.

2018-04-23 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-21168.

   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 19887
[https://github.com/apache/spark/pull/19887]

> KafkaRDD should always set kafka clientId.
> --
>
> Key: SPARK-21168
> URL: https://issues.apache.org/jira/browse/SPARK-21168
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: Xingxing Di
>Assignee: liuzhaokun
>Priority: Trivial
> Fix For: 2.4.0
>
>
> I found KafkaRDD not set kafka client.id in "fetchBatch" method 
> (FetchRequestBuilder will set clientId to empty by default),  normally this 
> will affect nothing, but in our case ,we use clientId at kafka server side, 
> so we have to rebuild spark-streaming-kafka。



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-21168) KafkaRDD should always set kafka clientId.

2018-04-23 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-21168:
--

Assignee: liuzhaokun

> KafkaRDD should always set kafka clientId.
> --
>
> Key: SPARK-21168
> URL: https://issues.apache.org/jira/browse/SPARK-21168
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: Xingxing Di
>Assignee: liuzhaokun
>Priority: Trivial
> Fix For: 2.4.0
>
>
> I found KafkaRDD not set kafka client.id in "fetchBatch" method 
> (FetchRequestBuilder will set clientId to empty by default),  normally this 
> will affect nothing, but in our case ,we use clientId at kafka server side, 
> so we have to rebuild spark-streaming-kafka。



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 1.1.0

2018-04-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442838#comment-16442838
 ] 

Cody Koeninger commented on SPARK-18057:


[~cricket007] here's a branch with spark 2.1.1 / kafka 0.11.0.2 if you want to 
try it out and see if it fixes your issue  
[https://github.com/koeninger/spark-1/tree/kafka-dependency-update-2.1.1]

I assume you'll need to skip tests in order to build it, I didn't do anything 
more than updating dependency + fixing compile errors.

> Update structured streaming kafka from 0.10.0.1 to 1.1.0
> 
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 1.1.0

2018-04-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16441818#comment-16441818
 ] 

Cody Koeninger commented on SPARK-18057:


Ok, if you can figure out what version of spark it is I can help w getting a 
branch with the updated dependency.

> Update structured streaming kafka from 0.10.0.1 to 1.1.0
> 
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 1.1.0

2018-04-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16441793#comment-16441793
 ] 

Cody Koeninger commented on SPARK-18057:


Just adding the extra dependency on 0.11 probably won't work, since there were 
some actual code changes not just a dependency bump.  As long as you aren't 
deleting topics, the blocker KAFKA-4879 issue shouldn't affect you trying out a 
branch with the changes, and it wouldn't require changing your spark 
deployment, just the app assembly.

[~sliebau] [~Yohan123] do you have an up to date branch of spark with the kafka 
client dependency change that Jordan can try out?  If not I can make one.

> Update structured streaming kafka from 0.10.0.1 to 1.1.0
> 
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22968) java.lang.IllegalStateException: No current assignment for partition kssh-2

2018-04-17 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-22968.

   Resolution: Fixed
Fix Version/s: 2.4.0

Issue resolved by pull request 21038
[https://github.com/apache/spark/pull/21038]

> java.lang.IllegalStateException: No current assignment for partition kssh-2
> ---
>
> Key: SPARK-22968
> URL: https://issues.apache.org/jira/browse/SPARK-22968
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Kafka:  0.10.0  (CDH5.12.0)
> Apache Spark 2.1.1
> Spark streaming+Kafka
>Reporter: Jepson
>Assignee: Saisai Shao
>Priority: Major
> Fix For: 2.4.0
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> *Kafka Broker:*
> {code:java}
>message.max.bytes : 2621440  
> {code}
> *Spark Streaming+Kafka Code:*
> {code:java}
> , "max.partition.fetch.bytes" -> (5242880: java.lang.Integer) //default: 
> 1048576
> , "request.timeout.ms" -> (9: java.lang.Integer) //default: 6
> , "session.timeout.ms" -> (6: java.lang.Integer) //default: 3
> , "heartbeat.interval.ms" -> (5000: java.lang.Integer)
> , "receive.buffer.bytes" -> (10485760: java.lang.Integer)
> {code}
> *Error message:*
> {code:java}
> 8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously 
> assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, 
> kssh-1] for group use_a_separate_group_id_for_each_stream
> 18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group 
> use_a_separate_group_id_for_each_stream
> 18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined 
> group use_a_separate_group_id_for_each_stream with generation 4
> 18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned 
> partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group 
> use_a_separate_group_id_for_each_stream
> 18/01/05 09:48:27 ERROR scheduler.JobScheduler: Error generating jobs for 
> time 1515116907000 ms
> java.lang.IllegalStateException: No current assignment for partition kssh-2
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> 

[jira] [Assigned] (SPARK-22968) java.lang.IllegalStateException: No current assignment for partition kssh-2

2018-04-17 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-22968:
--

Assignee: Saisai Shao

> java.lang.IllegalStateException: No current assignment for partition kssh-2
> ---
>
> Key: SPARK-22968
> URL: https://issues.apache.org/jira/browse/SPARK-22968
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1
> Environment: Kafka:  0.10.0  (CDH5.12.0)
> Apache Spark 2.1.1
> Spark streaming+Kafka
>Reporter: Jepson
>Assignee: Saisai Shao
>Priority: Major
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> *Kafka Broker:*
> {code:java}
>message.max.bytes : 2621440  
> {code}
> *Spark Streaming+Kafka Code:*
> {code:java}
> , "max.partition.fetch.bytes" -> (5242880: java.lang.Integer) //default: 
> 1048576
> , "request.timeout.ms" -> (9: java.lang.Integer) //default: 6
> , "session.timeout.ms" -> (6: java.lang.Integer) //default: 3
> , "heartbeat.interval.ms" -> (5000: java.lang.Integer)
> , "receive.buffer.bytes" -> (10485760: java.lang.Integer)
> {code}
> *Error message:*
> {code:java}
> 8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously 
> assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, 
> kssh-1] for group use_a_separate_group_id_for_each_stream
> 18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group 
> use_a_separate_group_id_for_each_stream
> 18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined 
> group use_a_separate_group_id_for_each_stream with generation 4
> 18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned 
> partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group 
> use_a_separate_group_id_for_each_stream
> 18/01/05 09:48:27 ERROR scheduler.JobScheduler: Error generating jobs for 
> time 1515116907000 ms
> java.lang.IllegalStateException: No current assignment for partition kssh-2
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>   at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>   at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>   at scala.util.Try$.apply(Try.scala:192)
>

[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 1.1.0

2018-04-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16441606#comment-16441606
 ] 

Cody Koeninger commented on SPARK-18057:


Out of curiosity, was that a compacted topic?



> Update structured streaming kafka from 0.10.0.1 to 1.1.0
> 
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19680) Offsets out of range with no configured reset policy for partitions

2018-04-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16433039#comment-16433039
 ] 

Cody Koeninger commented on SPARK-19680:


[~nerdynick]  If you submit a PR to add documentation I'd be happy to review it.

IMHO something like KAFKA-3370 is really where this issue "should" be fixed.  
In the absence of that, I think using time-based indexes would be the best 
workaround for making jobs easier to start.  If you have a constructive 
alternative suggestion that doesn't conflate offset reset at startup with 
silently losing data in the middle of a running app, I'd be happy to discuss it.

> Offsets out of range with no configured reset policy for partitions
> ---
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Schakmann Rene
>Priority: Major
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>"auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>   Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
> Map(
>   "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>   "group.id" -> properties.getProperty("kafka.consumer.group"),
>   "auto.offset.reset" -> "earliest",
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "enable.auto.commit" -> "false",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>   val topList = new TopList
>   topList.setCreated(new Date())
>   topList.setTopListEntryList(rdd.take(TopListLength).toList)
>   CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>   kafkaSink.value.send(SendToTopicName, topList)
>   CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>   .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>   .map(row => {
> val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
>   })
>   .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
> Minutes(windowDuration),
> Seconds(slideDuration))
>   .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>   .map(row => (row._2.getSearchCount, row._2))
>   .transform(rdd => rdd.sortByKey(ascending = false))
>   .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
> val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
> val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
> val 

[jira] [Commented] (SPARK-23739) Spark structured streaming long running problem

2018-03-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16414326#comment-16414326
 ] 

Cody Koeninger commented on SPARK-23739:


Ok, the OutOfMemoryError is probably a separate and unrelated issue.



> Spark structured streaming long running problem
> ---
>
> Key: SPARK-23739
> URL: https://issues.apache.org/jira/browse/SPARK-23739
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Florencio
>Priority: Critical
>  Labels: spark, streaming, structured
>
> I had a problem with long running spark structured streaming in spark 2.1. 
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.requests.LeaveGroupResponse.
> The detailed error is the following:
> 18/03/16 16:10:57 INFO StreamExecution: Committed offsets for batch 2110. 
> Metadata OffsetSeqMetadata(0,1521216656590)
> 18/03/16 16:10:57 INFO KafkaSource: GetBatch called with start = 
> Some(\{"TopicName":{"2":5520197,"1":5521045,"3":5522054,"0":5527915}}), end = 
> \{"TopicName":{"2":5522730,"1":5523577,"3":5524586,"0":5530441}}
> 18/03/16 16:10:57 INFO KafkaSource: Partitions added: Map()
> 18/03/16 16:10:57 ERROR StreamExecution: Query [id = 
> a233b9ff-cc39-44d3-b953-a255986c04bf, runId = 
> 8520e3c0-2455-4ac1-9021-8518fb58b3f8] terminated with error
> java.util.zip.ZipException: invalid code lengths set
>  at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164)
>  at java.io.FilterInputStream.read(FilterInputStream.java:133)
>  at java.io.FilterInputStream.read(FilterInputStream.java:107)
>  at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:354)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>  at org.apache.spark.util.Utils$.copyStream(Utils.scala:362)
>  at 
> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:45)
>  at 
> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:83)
>  at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173)
>  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>  at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
>  at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:503)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:499)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 18/03/16 16:10:57 ERROR ClientUtils: Failed to close coordinator
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/requests/LeaveGroupResponse
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendLeaveGroupRequest(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:566)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:555)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:377)
>  at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1383)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1364)
>  at 

[jira] [Commented] (SPARK-23739) Spark structured streaming long running problem

2018-03-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411503#comment-16411503
 ] 

Cody Koeninger commented on SPARK-23739:


I meant the version of the org.apache.kafka kafka-clients artifact,
not the org.apache.spark artifact.

You can also unzip the assembly jar, it should have a license file for
kafka-clients that has the version number appended to it, and that way
you can also verify that the class file for that class is in the
assembly.



> Spark structured streaming long running problem
> ---
>
> Key: SPARK-23739
> URL: https://issues.apache.org/jira/browse/SPARK-23739
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Florencio
>Priority: Critical
>  Labels: spark, streaming, structured
>
> I had a problem with long running spark structured streaming in spark 2.1. 
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.requests.LeaveGroupResponse.
> The detailed error is the following:
> 18/03/16 16:10:57 INFO StreamExecution: Committed offsets for batch 2110. 
> Metadata OffsetSeqMetadata(0,1521216656590)
> 18/03/16 16:10:57 INFO KafkaSource: GetBatch called with start = 
> Some(\{"TopicName":{"2":5520197,"1":5521045,"3":5522054,"0":5527915}}), end = 
> \{"TopicName":{"2":5522730,"1":5523577,"3":5524586,"0":5530441}}
> 18/03/16 16:10:57 INFO KafkaSource: Partitions added: Map()
> 18/03/16 16:10:57 ERROR StreamExecution: Query [id = 
> a233b9ff-cc39-44d3-b953-a255986c04bf, runId = 
> 8520e3c0-2455-4ac1-9021-8518fb58b3f8] terminated with error
> java.util.zip.ZipException: invalid code lengths set
>  at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164)
>  at java.io.FilterInputStream.read(FilterInputStream.java:133)
>  at java.io.FilterInputStream.read(FilterInputStream.java:107)
>  at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:354)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>  at org.apache.spark.util.Utils$.copyStream(Utils.scala:362)
>  at 
> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:45)
>  at 
> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:83)
>  at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173)
>  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>  at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
>  at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:503)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:499)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 18/03/16 16:10:57 ERROR ClientUtils: Failed to close coordinator
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/requests/LeaveGroupResponse
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendLeaveGroupRequest(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:566)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:555)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:377)
>  at 

[jira] [Commented] (SPARK-23739) Spark structured streaming long running problem

2018-03-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411396#comment-16411396
 ] 

Cody Koeninger commented on SPARK-23739:


What version of the org.apache.kafka artifact is in the assembly?

> Spark structured streaming long running problem
> ---
>
> Key: SPARK-23739
> URL: https://issues.apache.org/jira/browse/SPARK-23739
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Florencio
>Priority: Critical
>  Labels: spark, streaming, structured
>
> I had a problem with long running spark structured streaming in spark 2.1. 
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.requests.LeaveGroupResponse.
> The detailed error is the following:
> 18/03/16 16:10:57 INFO StreamExecution: Committed offsets for batch 2110. 
> Metadata OffsetSeqMetadata(0,1521216656590)
> 18/03/16 16:10:57 INFO KafkaSource: GetBatch called with start = 
> Some(\{"TopicName":{"2":5520197,"1":5521045,"3":5522054,"0":5527915}}), end = 
> \{"TopicName":{"2":5522730,"1":5523577,"3":5524586,"0":5530441}}
> 18/03/16 16:10:57 INFO KafkaSource: Partitions added: Map()
> 18/03/16 16:10:57 ERROR StreamExecution: Query [id = 
> a233b9ff-cc39-44d3-b953-a255986c04bf, runId = 
> 8520e3c0-2455-4ac1-9021-8518fb58b3f8] terminated with error
> java.util.zip.ZipException: invalid code lengths set
>  at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164)
>  at java.io.FilterInputStream.read(FilterInputStream.java:133)
>  at java.io.FilterInputStream.read(FilterInputStream.java:107)
>  at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:354)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>  at org.apache.spark.util.Utils$.copyStream(Utils.scala:362)
>  at 
> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:45)
>  at 
> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:83)
>  at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173)
>  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>  at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
>  at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:503)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:499)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 18/03/16 16:10:57 ERROR ClientUtils: Failed to close coordinator
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/requests/LeaveGroupResponse
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendLeaveGroupRequest(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:566)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:555)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:377)
>  at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1383)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1364)
>  at 

[jira] [Resolved] (SPARK-18580) Use spark.streaming.backpressure.initialRate in DirectKafkaInputDStream

2018-03-21 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-18580.

  Resolution: Fixed
Target Version/s: 2.4.0

> Use spark.streaming.backpressure.initialRate in DirectKafkaInputDStream
> ---
>
> Key: SPARK-18580
> URL: https://issues.apache.org/jira/browse/SPARK-18580
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: Oleg Muravskiy
>Assignee: Oleksandr Konopko
>Priority: Major
>
> Currently the `spark.streaming.kafka.maxRatePerPartition` is used as the 
> initial rate when the backpressure is enabled. This is too exhaustive for the 
> application while it still warms up.
> This is similar to SPARK-11627, applying the solution provided there to 
> DirectKafkaInputDStream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18580) Use spark.streaming.backpressure.initialRate in DirectKafkaInputDStream

2018-03-21 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-18580:
--

Assignee: Oleksandr Konopko

> Use spark.streaming.backpressure.initialRate in DirectKafkaInputDStream
> ---
>
> Key: SPARK-18580
> URL: https://issues.apache.org/jira/browse/SPARK-18580
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: Oleg Muravskiy
>Assignee: Oleksandr Konopko
>Priority: Major
>
> Currently the `spark.streaming.kafka.maxRatePerPartition` is used as the 
> initial rate when the backpressure is enabled. This is too exhaustive for the 
> application while it still warms up.
> This is similar to SPARK-11627, applying the solution provided there to 
> DirectKafkaInputDStream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18371) Spark Streaming backpressure bug - generates a batch with large number of records

2018-03-16 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-18371.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Spark Streaming backpressure bug - generates a batch with large number of 
> records
> -
>
> Key: SPARK-18371
> URL: https://issues.apache.org/jira/browse/SPARK-18371
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: mapreduced
>Assignee: Sebastian Arzt
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: 01.png, 02.png, GiantBatch2.png, GiantBatch3.png, 
> Giant_batch_at_23_00.png, Look_at_batch_at_22_14.png
>
>
> When the streaming job is configured with backpressureEnabled=true, it 
> generates a GIANT batch of records if the processing time + scheduled delay 
> is (much) larger than batchDuration. This creates a backlog of records like 
> no other and results in batches queueing for hours until it chews through 
> this giant batch.
> Expectation is that it should reduce the number of records per batch in some 
> time to whatever it can really process.
> Attaching some screen shots where it seems that this issue is quite easily 
> reproducible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18371) Spark Streaming backpressure bug - generates a batch with large number of records

2018-03-16 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-18371:
--

Assignee: Sebastian Arzt

> Spark Streaming backpressure bug - generates a batch with large number of 
> records
> -
>
> Key: SPARK-18371
> URL: https://issues.apache.org/jira/browse/SPARK-18371
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: mapreduced
>Assignee: Sebastian Arzt
>Priority: Major
> Attachments: 01.png, 02.png, GiantBatch2.png, GiantBatch3.png, 
> Giant_batch_at_23_00.png, Look_at_batch_at_22_14.png
>
>
> When the streaming job is configured with backpressureEnabled=true, it 
> generates a GIANT batch of records if the processing time + scheduled delay 
> is (much) larger than batchDuration. This creates a backlog of records like 
> no other and results in batches queueing for hours until it chews through 
> this giant batch.
> Expectation is that it should reduce the number of records per batch in some 
> time to whatever it can really process.
> Attaching some screen shots where it seems that this issue is quite easily 
> reproducible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 1.1.0

2018-03-05 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387245#comment-16387245
 ] 

Cody Koeninger commented on SPARK-18057:


It's probably easiest to keep the KIP discussion on the Kafka mailing list.  I 
personally think overloads taking a timeout is probably the most flexible 
option, but I think any of the options under discussion (overloads, reuse 
existing config, new config) are workable from a Spark integration point of 
view.

> Update structured streaming kafka from 0.10.0.1 to 1.1.0
> 
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19767) API Doc pages for Streaming with Kafka 0.10 not current

2018-03-05 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387217#comment-16387217
 ] 

Cody Koeninger commented on SPARK-19767:


[~nafshartous] I think at this point people are more likely to need easy access 
to the API docs for the 0.10 artifact rather than the 0.8, do you agree?

If so look at commit 

1f0d0213

for what I did to skip 0.10

If you want to change that to skip 0.8 instead, and include 0.10, I'd support 
that.

Although now that I look at it, the latest api docs 
[http://spark.apache.org/docs/latest/api/scala/index.html]  no longer have the 
kafka namespace at all, whereas 
[http://spark.apache.org/docs/2.2.1/api/scala/index.html]  still did.  Haven't 
dug into why.

> API Doc pages for Streaming with Kafka 0.10 not current
> ---
>
> Key: SPARK-19767
> URL: https://issues.apache.org/jira/browse/SPARK-19767
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Nick Afshartous
>Priority: Minor
>
> The API docs linked from the Spark Kafka 0.10 Integration page are not 
> current.  For instance, on the page
>https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
> the code examples show the new API (i.e. class ConsumerStrategies).  However, 
> following the links
> API Docs --> (Scala | Java)
> lead to API pages that do not have class ConsumerStrategies) .  The API doc 
> package names  also have {code}streaming.kafka{code} as opposed to 
> {code}streaming.kafka10{code} 
> as in the code examples on streaming-kafka-0-10-integration.html.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370924#comment-16370924
 ] 

Cody Koeninger commented on SPARK-18057:


Just doing the upgrade is probably a good starting point for any
potential new contributor who's interested in Kafka.

Agreed that time indexes would be a great thing to take advantage of
after that, happy to help out with either.

On Tue, Feb 20, 2018 at 6:17 PM, Michael Armbrust (JIRA)


> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2018-02-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370460#comment-16370460
 ] 

Cody Koeninger commented on SPARK-18057:


My guess is that DStream based integrations aren't really on committer's minds.

Happy to help, given clear direction on what artifact naming is likely
to be accepted.



> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>Priority: Major
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22561) Dynamically update topics list for spark kafka consumer

2017-11-23 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-22561.

Resolution: Not A Problem

> Dynamically update topics list for spark kafka consumer
> ---
>
> Key: SPARK-22561
> URL: https://issues.apache.org/jira/browse/SPARK-22561
> Project: Spark
>  Issue Type: New Feature
>  Components: DStreams
>Affects Versions: 2.1.0, 2.1.1, 2.2.0
>Reporter: Arun
>
> The Spark Streaming application should allow to add new topic after streaming 
> context is intialized and DStream is started.  This is very useful feature 
> specially when business is working multi geography or  multi business units. 
> For example initially I have spark-kakfa consumer listening for topics: 
> ["topic-1"."topic-2"] and after couple of days I have added new topics to 
> kafka ["topic-3","topic-4"], now is there a way to update spark-kafka 
> consumer topics list and ask spark-kafka consumer to consume data for updated 
> list of topics without stopping sparkStreaming application or sparkStreaming 
> context.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22561) Dynamically update topics list for spark kafka consumer

2017-11-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264643#comment-16264643
 ] 

Cody Koeninger commented on SPARK-22561:


See SubscribePattern

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#consumerstrategies

> Dynamically update topics list for spark kafka consumer
> ---
>
> Key: SPARK-22561
> URL: https://issues.apache.org/jira/browse/SPARK-22561
> Project: Spark
>  Issue Type: New Feature
>  Components: DStreams
>Affects Versions: 2.1.0, 2.1.1, 2.2.0
>Reporter: Arun
>
> The Spark Streaming application should allow to add new topic after streaming 
> context is intialized and DStream is started.  This is very useful feature 
> specially when business is working multi geography or  multi business units. 
> For example initially I have spark-kakfa consumer listening for topics: 
> ["topic-1"."topic-2"] and after couple of days I have added new topics to 
> kafka ["topic-3","topic-4"], now is there a way to update spark-kafka 
> consumer topics list and ask spark-kafka consumer to consume data for updated 
> list of topics without stopping sparkStreaming application or sparkStreaming 
> context.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22486) Support synchronous offset commits for Kafka

2017-11-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248891#comment-16248891
 ] 

Cody Koeninger commented on SPARK-22486:


Can you identify a clear use case for this, given that

- applications have to be tolerant of repeated messages anyway
- the consumer doing the work (on the executor) is not the same as the consumer 
doing the coordination and offset commits (on the driver)

> Support synchronous offset commits for Kafka
> 
>
> Key: SPARK-22486
> URL: https://issues.apache.org/jira/browse/SPARK-22486
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.2.0
>Reporter: Jeremy Beard
>
> CanCommitOffsets provides asynchronous offset commits (via 
> Consumer#commitAsync), and it would be useful if it also provided synchronous 
> offset commits (via Consumer#commitSync) for when the desired behavior is to 
> block until it is complete.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19680) Offsets out of range with no configured reset policy for partitions

2017-11-07 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16242438#comment-16242438
 ] 

Cody Koeninger commented on SPARK-19680:


If you got an OffsetOutOfRangeException after a job had been running
for 4 days, it likely means that your job was not running fast enough
to keep up, and retention expired records that had not yet been
processed.



> Offsets out of range with no configured reset policy for partitions
> ---
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>"auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>   Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
> Map(
>   "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>   "group.id" -> properties.getProperty("kafka.consumer.group"),
>   "auto.offset.reset" -> "earliest",
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "enable.auto.commit" -> "false",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>   val topList = new TopList
>   topList.setCreated(new Date())
>   topList.setTopListEntryList(rdd.take(TopListLength).toList)
>   CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>   kafkaSink.value.send(SendToTopicName, topList)
>   CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>   .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>   .map(row => {
> val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
>   })
>   .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
> Minutes(windowDuration),
> Seconds(slideDuration))
>   .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>   .map(row => (row._2.getSearchCount, row._2))
>   .transform(rdd => rdd.sortByKey(ascending = false))
>   .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
> val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
> val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
> val kafkaParams: Map[String, Object] = 
> SparkUtil.getDefaultKafkaReceiverParameter(properties)
> val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
> ssc.checkpoint("/home/spark/checkpoints")
> val adEventStream =
>   

[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-10-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221025#comment-16221025
 ] 

Cody Koeninger commented on SPARK-20928:


No, it doesn't exist yet as far as I know.

Reason I ask is that Michael had said on the dev list in September "I
think that we are going to have to change the Sink API as part of
SPARK-20928, which is why I linked these tickets together."

For aggregates, conceptually I think that the minimum and maximum per
partition kafka offset for any data involved in the aggregate is
sufficient to identify it.  But it seems like map-only is the bigger
focus here, which is probably fine.




> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) SPIP: Continuous Processing Mode for Structured Streaming

2017-10-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16220694#comment-16220694
 ] 

Cody Koeninger commented on SPARK-20928:


Can you clarify how this impacts sinks having access to the underlying kafka 
offsets, e.g. https://issues.apache.org/jira/browse/SPARK-18258

> SPIP: Continuous Processing Mode for Structured Streaming
> -
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
> Attachments: Continuous Processing in Structured Streaming Design 
> Sketch.pdf
>
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202923#comment-16202923
 ] 

Cody Koeninger commented on SPARK-20928:


If a given sink is handling a result, why does handling the
corresponding offset to the result substantially increase overhead?

Thinking about it in terms of a downstream database, if I'm doing a
write per result, then the difference between writing (result) and
writing (result, offset) seems like it should be overshadowed by the
overall cost of the write.

In more practical terms, poll() on the kafka consumer is returning a
batch of pre-fetched messages anyway, not a single message, so one
should be able to run their straight line map/filter/whatever on the
batch and then commit results with the last offset.




> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-10-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202905#comment-16202905
 ] 

Cody Koeninger commented on SPARK-20928:


I was talking about the specific case of jobs with only narrow stages.  If 
there's no shuffle, then it should be sufficient at any given point to record 
the per-partition offset alongside the result.

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>  Labels: SPIP
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2017-09-07 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157002#comment-16157002
 ] 

Cody Koeninger commented on SPARK-17147:


Patch is there, if anyone wants to test it and provide feedback I'm happy to 
help.  We don't use compacted topics, so I can't really test it against 
production workloads.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19680) Offsets out of range with no configured reset policy for partitions

2017-07-06 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16076646#comment-16076646
 ] 

Cody Koeninger commented on SPARK-19680:


Direct Stream can take a mapping from topicpartition to starting offset.

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#your-own-data-store

Identifying the appropriate starting offset is up to you in that case - could 
be based on your offset store, or the current high / low water marks from a 
kafka consumer, or...

If you're looking for adding a friendlier interface in spark, probably the best 
way to do this is expose something based on the kafka time index 
(.offsetsForTimes) but that requires the latest versions of kafka

https://issues.apache.org/jira/browse/KAFKA-3163

We rolled our own time index way before kafka added it, so I wouldn't be in a 
position to test a patch like that on production loads.  But if someone is 
willing to commit to it (at least for testing), I can help with the work.



> Offsets out of range with no configured reset policy for partitions
> ---
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>"auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>   Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
> Map(
>   "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>   "group.id" -> properties.getProperty("kafka.consumer.group"),
>   "auto.offset.reset" -> "earliest",
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "enable.auto.commit" -> "false",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>   val topList = new TopList
>   topList.setCreated(new Date())
>   topList.setTopListEntryList(rdd.take(TopListLength).toList)
>   CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>   kafkaSink.value.send(SendToTopicName, topList)
>   CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>   .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>   .map(row => {
> val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
>   })
>   .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
> Minutes(windowDuration),
> Seconds(slideDuration))
>   .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>   .map(row => (row._2.getSearchCount, row._2))
>   .transform(rdd => rdd.sortByKey(ascending = false))
>   .map(row => new TopListEntry(row._2.getKeyword, 

[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-06-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068980#comment-16068980
 ] 

Cody Koeninger commented on SPARK-18057:


Kafka 0.11 is now released.  

Are we upgrading spark artifacts named kafka-0-10 to use kafka 0.11, or are we 
renaming them to kafka-0-11?

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21233) Support pluggable offset storage

2017-06-28 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16066517#comment-16066517
 ] 

Cody Koeninger commented on SPARK-21233:


You already have the choice of where you want to store offsets.

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets

> Support pluggable offset storage
> 
>
> Key: SPARK-21233
> URL: https://issues.apache.org/jira/browse/SPARK-21233
> Project: Spark
>  Issue Type: New Feature
>  Components: DStreams
>Affects Versions: 2.0.2, 2.1.1
>Reporter: darion yaphet
>
> Currently we using *ZooKeeper* to save the *Kafka Commit Offset* , when there 
> are a lot of streaming program running in the cluster the ZooKeeper Cluster's 
> loading is very high . Maybe Zookeeper is not very suitable to save offset 
> periodicity.
> This issue is wish to support a pluggable offset storage to avoid save it in 
> the zookeeper 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-06-19 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16054690#comment-16054690
 ] 

Cody Koeninger commented on SPARK-20928:


Cool, can you label it SPIP so it shows up linked from 
http://spark.apache.org/improvement-proposals.html

My only concern so far was the one I mentioned already, namely that it seems 
like you shouldn't have to give up exactly-once delivery semantics in all cases.

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-06-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16052831#comment-16052831
 ] 

Cody Koeninger commented on SPARK-20928:


This needs an improvement proposal.

Based on discussions on the mailing list and representations made at Spark 
Summit, this is already a "work in progress".

If that's the case, it needs community involvement now, not later, to reduce 
the danger of Databricks developing something that meets their internal needs 
but not community needs.

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20928) Continuous Processing Mode for Structured Streaming

2017-06-03 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035964#comment-16035964
 ] 

Cody Koeninger commented on SPARK-20928:


For jobs that only have narrow stages, I think it should be possible to 
maintain delivery semantics as well.

> Continuous Processing Mode for Structured Streaming
> ---
>
> Key: SPARK-20928
> URL: https://issues.apache.org/jira/browse/SPARK-20928
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Michael Armbrust
>
> Given the current Source API, the minimum possible latency for any record is 
> bounded by the amount of time that it takes to launch a task.  This 
> limitation is a result of the fact that {{getBatch}} requires us to know both 
> the starting and the ending offset, before any tasks are launched.  In the 
> worst case, the end-to-end latency is actually closer to the average batch 
> time + task launching time.
> For applications where latency is more important than exactly-once output 
> however, it would be useful if processing could happen continuously.  This 
> would allow us to achieve fully pipelined reading and writing from sources 
> such as Kafka.  This kind of architecture would make it possible to process 
> records with end-to-end latencies on the order of 1 ms, rather than the 
> 10-100ms that is possible today.
> One possible architecture here would be to change the Source API to look like 
> the following rough sketch:
> {code}
>   trait Epoch {
> def data: DataFrame
> /** The exclusive starting position for `data`. */
> def startOffset: Offset
> /** The inclusive ending position for `data`.  Incrementally updated 
> during processing, but not complete until execution of the query plan in 
> `data` is finished. */
> def endOffset: Offset
>   }
>   def getBatch(startOffset: Option[Offset], endOffset: Option[Offset], 
> limits: Limits): Epoch
> {code}
> The above would allow us to build an alternative implementation of 
> {{StreamExecution}} that processes continuously with much lower latency and 
> only stops processing when needing to reconfigure the stream (either due to a 
> failure or a user requested change in parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-04-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977709#comment-15977709
 ] 

Cody Koeninger commented on SPARK-18057:


People have also been reporting that explicit dependency on 0.10.2.0 was 
working for them where 0.10.1.0 wasn't (e.g. SPARK-20036)

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20036) impossible to read a whole kafka topic using kafka 0.10 and spark 2.0.0

2017-04-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973554#comment-15973554
 ] 

Cody Koeninger commented on SPARK-20036:


[~danielnuriyev] what actually happened when you removed all of the explicit 
org.apache.kafka dependencies?

> impossible to read a whole kafka topic using kafka 0.10 and spark 2.0.0 
> 
>
> Key: SPARK-20036
> URL: https://issues.apache.org/jira/browse/SPARK-20036
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.0.0
>Reporter: Daniel Nuriyev
> Attachments: Main.java, pom.xml
>
>
> I use kafka 0.10.1 and java code with the following dependencies:
> 
> org.apache.kafka
> kafka_2.11
> 0.10.1.1
> 
> 
> org.apache.kafka
> kafka-clients
> 0.10.1.1
> 
> 
> org.apache.spark
> spark-streaming_2.11
> 2.0.0
> 
> 
> org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.0
> 
> The code tries to read the whole topic using:
> kafkaParams.put("auto.offset.reset", "earliest");
> Using 5 second batches:
> jssc = new JavaStreamingContext(conf, Durations.seconds(5));
> Each batch returns empty.
> I debugged the code I noticed that KafkaUtils.fixKafkaParams is called that 
> overrides "earliest" with "none".
> Whether this is related or not, when I used kafka 0.8 on the client with 
> kafka 0.10.1 on the server, I could read the whole topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20036) impossible to read a whole kafka topic using kafka 0.10 and spark 2.0.0

2017-04-18 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973507#comment-15973507
 ] 

Cody Koeninger commented on SPARK-20036:


I'll submit a PR to add a note to the docs about this.

> impossible to read a whole kafka topic using kafka 0.10 and spark 2.0.0 
> 
>
> Key: SPARK-20036
> URL: https://issues.apache.org/jira/browse/SPARK-20036
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.0.0
>Reporter: Daniel Nuriyev
> Attachments: Main.java, pom.xml
>
>
> I use kafka 0.10.1 and java code with the following dependencies:
> 
> org.apache.kafka
> kafka_2.11
> 0.10.1.1
> 
> 
> org.apache.kafka
> kafka-clients
> 0.10.1.1
> 
> 
> org.apache.spark
> spark-streaming_2.11
> 2.0.0
> 
> 
> org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.0
> 
> The code tries to read the whole topic using:
> kafkaParams.put("auto.offset.reset", "earliest");
> Using 5 second batches:
> jssc = new JavaStreamingContext(conf, Durations.seconds(5));
> Each batch returns empty.
> I debugged the code I noticed that KafkaUtils.fixKafkaParams is called that 
> overrides "earliest" with "none".
> Whether this is related or not, when I used kafka 0.8 on the client with 
> kafka 0.10.1 on the server, I could read the whole topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-13 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-20287:
---

What you're describing is closer to the receiver-based implementation,
which had a number of issues.  What I tried to achieve with the direct
stream implementation was to have the driver figure out offset ranges
for the next batch, then have executors deterministically consume
exactly those messages with a 1:1 mapping between kafka partition and
spark partition.

If you have a single consumer subscribed to multiple topicpartitions,
you'll get intermingled messages for all of those partitions.  With
the new consumer api subscribed to multiple partitions, there isn't a
way to say "get topicpartition A until offset 1234", which is what we
need.




-- 
Cody Koeninger


> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19976) DirectStream API throws OffsetOutOfRange Exception

2017-04-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966208#comment-15966208
 ] 

Cody Koeninger commented on SPARK-19976:


What would your expected behavior be when you delete data out of kafka before 
the stream has read it?

My expectation would be that it fails.

> DirectStream API throws OffsetOutOfRange Exception
> --
>
> Key: SPARK-19976
> URL: https://issues.apache.org/jira/browse/SPARK-19976
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Taukir
>
> I am using following code. While data on kafka topic get deleted/retention 
> period is over, it throws Exception and application crash
> def functionToCreateContext(sc:SparkContext):StreamingContext = {
> val kafkaParams = new mutable.HashMap[String, Object]()
> kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
> kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGrp)
> kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> classOf[StringDeserializer])
> kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> classOf[StringDeserializer])
> kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")
> kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>val consumerStrategy = ConsumerStrategies.Subscribe[String, 
> String](topic.split(",").map(_.trim).filter(!_.isEmpty).toSet, kafkaParams)
> val kafkaStream  = 
> KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategy)
> }
> spark throws error and crash once OffsetOutOf RangeException  is thrown
> WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 : 
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of 
> range with no configured reset policy for partitions: {test-2=127287}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20037) impossible to set kafka offsets using kafka 0.10 and spark 2.0.0

2017-04-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966199#comment-15966199
 ] 

Cody Koeninger commented on SPARK-20037:


I'd be inclined to say this is a duplicate of the issue in SPARK-20036, and 
would start investigating the same way,i.e. remove the explicit dependencies on 
org.apache.kafka artifacts.

> impossible to set kafka offsets using kafka 0.10 and spark 2.0.0
> 
>
> Key: SPARK-20037
> URL: https://issues.apache.org/jira/browse/SPARK-20037
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.0.0
>Reporter: Daniel Nuriyev
> Attachments: Main.java, offsets.png
>
>
> I use kafka 0.10.1 and java code with the following dependencies:
> 
> org.apache.kafka
> kafka_2.11
> 0.10.1.1
> 
> 
> org.apache.kafka
> kafka-clients
> 0.10.1.1
> 
> 
> org.apache.spark
> spark-streaming_2.11
> 2.0.0
> 
> 
> org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.0
> 
> The code tries to read the a topic starting with offsets. 
> The topic has 4 partitions that start somewhere before 585000 and end after 
> 674000. So I wanted to read all partitions starting with 585000
> fromOffsets.put(new TopicPartition(topic, 0), 585000L);
> fromOffsets.put(new TopicPartition(topic, 1), 585000L);
> fromOffsets.put(new TopicPartition(topic, 2), 585000L);
> fromOffsets.put(new TopicPartition(topic, 3), 585000L);
> Using 5 second batches:
> jssc = new JavaStreamingContext(conf, Durations.seconds(5));
> The code immediately throws:
> Beginning offset 585000 is after the ending offset 584464 for topic 
> commerce_item_expectation partition 1
> It does not make sense because this topic/partition starts at 584464, not ends
> I use this as a base: 
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
> But I use direct stream:
> KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),
> ConsumerStrategies.Subscribe(
> topics, kafkaParams, fromOffsets
> )
> )



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20036) impossible to read a whole kafka topic using kafka 0.10 and spark 2.0.0

2017-04-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966193#comment-15966193
 ] 

Cody Koeninger commented on SPARK-20036:


fixKafkaParams is related to executor consumers, not the driver consumer.  In a 
nutshell, the executor should read the offsets the driver tells it to, not auto 
reset.

Remove the explicit dependencies in your pom on org.apache.kafka artifacts.  
spark-streaming-kafka-0-10 has appropriate transitive dependencies.

> impossible to read a whole kafka topic using kafka 0.10 and spark 2.0.0 
> 
>
> Key: SPARK-20036
> URL: https://issues.apache.org/jira/browse/SPARK-20036
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.0.0
>Reporter: Daniel Nuriyev
> Attachments: Main.java, pom.xml
>
>
> I use kafka 0.10.1 and java code with the following dependencies:
> 
> org.apache.kafka
> kafka_2.11
> 0.10.1.1
> 
> 
> org.apache.kafka
> kafka-clients
> 0.10.1.1
> 
> 
> org.apache.spark
> spark-streaming_2.11
> 2.0.0
> 
> 
> org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.0
> 
> The code tries to read the whole topic using:
> kafkaParams.put("auto.offset.reset", "earliest");
> Using 5 second batches:
> jssc = new JavaStreamingContext(conf, Durations.seconds(5));
> Each batch returns empty.
> I debugged the code I noticed that KafkaUtils.fixKafkaParams is called that 
> overrides "earliest" with "none".
> Whether this is related or not, when I used kafka 0.8 on the client with 
> kafka 0.10.1 on the server, I could read the whole topic.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20287) Kafka Consumer should be able to subscribe to more than one topic partition

2017-04-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15966180#comment-15966180
 ] 

Cody Koeninger commented on SPARK-20287:


The issue here is that the underlying new Kafka consumer api doesn't have a way 
for a single consumer to subscribe to multiple partitions, but only read a 
particular range of messages from one of them.

The max capacity is just a simple way of dealing with what is basically a LRU 
cache - if someone creates topics dynamically and then stops sending messages 
to them, you don't want to keep leaking resources.

I'm not claiming there's anything great or elegant about those solutions, but 
they were pretty much the most straightforward way to make the direct stream 
model work with the new kafka consumer api.

> Kafka Consumer should be able to subscribe to more than one topic partition
> ---
>
> Key: SPARK-20287
> URL: https://issues.apache.org/jira/browse/SPARK-20287
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Stephane Maarek
>
> As I understand and as it stands, one Kafka Consumer is created for each 
> topic partition in the source Kafka topics, and they're cached.
> cf 
> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala#L48
> In my opinion, that makes the design an anti pattern for Kafka and highly 
> unefficient:
> - Each Kafka consumer creates a connection to Kafka
> - Spark doesn't leverage the power of the Kafka consumers, which is that it 
> automatically assigns and balances partitions amongst all the consumers that 
> share the same group.id
> - You can still cache your Kafka consumer even if it has multiple partitions.
> I'm not sure about how that translates to the spark underlying RDD 
> architecture, but from a Kafka standpoint, I believe creating one consumer 
> per partition is a big overhead, and a risk as the user may have to increase 
> the spark.streaming.kafka.consumer.cache.maxCapacity parameter. 
> Happy to discuss to understand the rationale



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19904) SPIP Add Spark Project Improvement Proposal doc to website

2017-03-27 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943827#comment-15943827
 ] 

Cody Koeninger commented on SPARK-19904:


It has been added to  apache/spark-website  git repo 
http://spark.apache.org/improvement-proposals.html  , with a link under the 
Community menu item.

It has not been added to apache/spark git repo 
http://spark.apache.org/docs/latest/  under the More menu item.

I'm not 100% clear on what the plan was for maintaining some of the website 
docs in a separate site from the main repo, and whether it's worth updating 
both for cross-linking.

> SPIP Add Spark Project Improvement Proposal doc to website
> --
>
> Key: SPARK-19904
> URL: https://issues.apache.org/jira/browse/SPARK-19904
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.1.0
>Reporter: Cody Koeninger
>Assignee: Cody Koeninger
>  Labels: SPIP
>
> see
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Improvement-Proposals-td19268.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19904) SPIP Add Spark Project Improvement Proposal doc to website

2017-03-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906593#comment-15906593
 ] 

Cody Koeninger commented on SPARK-19904:


Up now at

http://spark.apache.org/improvement-proposals.html

Is it worth also linking from the "More" tab of the docs in

http://spark.apache.org/docs/latest/

like the Contributing to Spark page



> SPIP Add Spark Project Improvement Proposal doc to website
> --
>
> Key: SPARK-19904
> URL: https://issues.apache.org/jira/browse/SPARK-19904
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.1.0
>Reporter: Cody Koeninger
>  Labels: SPIP
>
> see
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Improvement-Proposals-td19268.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19904) SPIP Add Spark Project Improvement Proposal doc to website

2017-03-12 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-19904:
--

Assignee: Cody Koeninger

> SPIP Add Spark Project Improvement Proposal doc to website
> --
>
> Key: SPARK-19904
> URL: https://issues.apache.org/jira/browse/SPARK-19904
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.1.0
>Reporter: Cody Koeninger
>Assignee: Cody Koeninger
>  Labels: SPIP
>
> see
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Improvement-Proposals-td19268.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19904) SPIP Add Spark Project Improvement Proposal doc to website

2017-03-12 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-19904:
---
Issue Type: Improvement  (was: Bug)

> SPIP Add Spark Project Improvement Proposal doc to website
> --
>
> Key: SPARK-19904
> URL: https://issues.apache.org/jira/browse/SPARK-19904
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.1.0
>Reporter: Cody Koeninger
>  Labels: SPIP
>
> see
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Improvement-Proposals-td19268.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-03-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905774#comment-15905774
 ] 

Cody Koeninger commented on SPARK-18057:


Based on previous kafka client upgrades I wouldn't expect them to be binary 
compatible, so it's likely to cause someone problems if they were also making 
use of kafka client libraries in their spark job.  Still may be the path of 
least resistance.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-03-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905747#comment-15905747
 ] 

Cody Koeninger commented on SPARK-18057:


I think the bigger question is once there's a kafka version you want to upgrade 
to, are you going to just forcibly upgrade, make another set of separate 
artifacts, or refactor common code so that it can use a different / provided 
kafka version.  Ditto for the DStream, unless you're just abandoning it.

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19888) Seeing offsets not resetting even when reset policy is configured explicitly

2017-03-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905730#comment-15905730
 ] 

Cody Koeninger commented on SPARK-19888:


That stacktrace also shows a concurrent modification exception, yes?.  See 
SPARK-19185 for that

See e.g. SPARK-19680 for background on why offset out of range may occur on 
executor when it doesn't on driver.  Although if you're using reset latest, 
unless you have really short retention this is kind of surprising.

> Seeing offsets not resetting even when reset policy is configured explicitly
> 
>
> Key: SPARK-19888
> URL: https://issues.apache.org/jira/browse/SPARK-19888
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Justin Miller
>
> I was told to post this in a Spark ticket from KAFKA-4396:
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>   "group.id" -> consumerGroup,
>   "bootstrap.servers" -> bootstrapServers,
>   "key.deserializer" -> classOf[ByteArrayDeserializer],
>   "value.deserializer" -> classOf[MessageRowDeserializer],
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "max.poll.records" -> persisterConfig.maxPollRecords,
>   "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>   "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>   "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>   "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 
> 39375) in 13444 ms on xyz (2/16)
> 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 
> 38843, xyz): 

[jira] [Commented] (SPARK-19863) Whether or not use CachedKafkaConsumer need to be configured, when you use DirectKafkaInputDStream to connect the kafka in a Spark Streaming application

2017-03-10 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905603#comment-15905603
 ] 

Cody Koeninger commented on SPARK-19863:


Isn't this basically a duplicate of
SPARK-19185
with the same implications?

> Whether or not use CachedKafkaConsumer need to be configured, when you use 
> DirectKafkaInputDStream to connect the kafka in a Spark Streaming application
> 
>
> Key: SPARK-19863
> URL: https://issues.apache.org/jira/browse/SPARK-19863
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Input/Output
>Affects Versions: 2.1.0
>Reporter: LvDongrong
>
> Whether or not use CachedKafkaConsumer need to be configured, when you use 
> DirectKafkaInputDStream to connect the kafka in a Spark Streaming 
> application. In Spark 2.x, the kafka consumer was replaced by 
> CachedKafkaConsumer (some KafkaConsumer will keep establishing the kafka 
> cluster), and cannot change the way. In fact ,The KafkaRDD(used by 
> DirectKafkaInputDStream to connect kafka) provide the parameter 
> useConsumerCache to choose Whether to use the CachedKafkaConsumer, but the 
> DirectKafkaInputDStream set the parameter true.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19904) SPIP Add Spark Project Improvement Proposal doc to website

2017-03-10 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-19904:
---
Description: 
see
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Improvement-Proposals-td19268.html

> SPIP Add Spark Project Improvement Proposal doc to website
> --
>
> Key: SPARK-19904
> URL: https://issues.apache.org/jira/browse/SPARK-19904
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 2.1.0
>Reporter: Cody Koeninger
>  Labels: SPIP
>
> see
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Improvement-Proposals-td19268.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19904) SPIP Add Spark Project Improvement Proposal doc to website

2017-03-10 Thread Cody Koeninger (JIRA)
Cody Koeninger created SPARK-19904:
--

 Summary: SPIP Add Spark Project Improvement Proposal doc to website
 Key: SPARK-19904
 URL: https://issues.apache.org/jira/browse/SPARK-19904
 Project: Spark
  Issue Type: Bug
  Components: Documentation
Affects Versions: 2.1.0
Reporter: Cody Koeninger






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2017-02-27 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887091#comment-15887091
 ] 

Cody Koeninger commented on SPARK-17147:


Dean if you guys have any bandwith to help test out 
https://github.com/koeninger/spark-1/tree/SPARK-17147  I'm happy to iterate on 
whatever you find.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-02-26 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18057:
---

I think you should ask Michael and / or Ryan what their plan is.




> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19680) Offsets out of range with no configured reset policy for partitions

2017-02-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878623#comment-15878623
 ] 

Cody Koeninger edited comment on SPARK-19680 at 2/22/17 4:25 PM:
-

The issue here is likely that you have lost data (because of retention 
expiration) between the time the batch was defined on the driver, and the time 
the executor attempted to process the batch.  Having executor consumers obey 
auto offset reset would result in silent data loss, which is a bad thing.

There's a more detailed description of the semantic issues around this for 
kafka in KAFKA-3370 and for structured streaming kafka in SPARK-17937

If you've got really aggressive retention settings and are having trouble 
getting a stream started, look at specifying earliest + some margin on startup 
as a workaround.  If you're having this trouble after a stream has been running 
for a while, you need more retention or smaller batches.




was (Author: c...@koeninger.org):
The issue here is likely that you have lost data (because of retention 
expiration) between the time the batch was defined on the driver, and the time 
the executor attempted to process the batch.  Having executor consumers obey 
auto offset reset would result in silent data loss, which is a bad thing.

There's a more detailed description of the semantic issues around this for 
kafka in KAFKA-3370 and for structured streaming kafka in SPARK-17937



> Offsets out of range with no configured reset policy for partitions
> ---
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>"auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>   Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
> Map(
>   "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>   "group.id" -> properties.getProperty("kafka.consumer.group"),
>   "auto.offset.reset" -> "earliest",
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "enable.auto.commit" -> "false",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>   val topList = new TopList
>   topList.setCreated(new Date())
>   topList.setTopListEntryList(rdd.take(TopListLength).toList)
>   CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>   kafkaSink.value.send(SendToTopicName, topList)
>   CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>   .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>   .map(row => {
> val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
>   })
>   .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new 

[jira] [Commented] (SPARK-19680) Offsets out of range with no configured reset policy for partitions

2017-02-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878623#comment-15878623
 ] 

Cody Koeninger commented on SPARK-19680:


The issue here is likely that you have lost data (because of retention 
expiration) between the time the batch was defined on the driver, and the time 
the executor attempted to process the batch.  Having executor consumers obey 
auto offset reset would result in silent data loss, which is a bad thing.

There's a more detailed description of the semantic issues around this for 
kafka in KAFKA-3370 and for structured streaming kafka in SPARK-17937



> Offsets out of range with no configured reset policy for partitions
> ---
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>"auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>   Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
> Map(
>   "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>   "group.id" -> properties.getProperty("kafka.consumer.group"),
>   "auto.offset.reset" -> "earliest",
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "enable.auto.commit" -> "false",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>   val topList = new TopList
>   topList.setCreated(new Date())
>   topList.setTopListEntryList(rdd.take(TopListLength).toList)
>   CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>   kafkaSink.value.send(SendToTopicName, topList)
>   CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>   .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>   .map(row => {
> val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
>   })
>   .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
> Minutes(windowDuration),
> Seconds(slideDuration))
>   .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>   .map(row => (row._2.getSearchCount, row._2))
>   .transform(rdd => rdd.sortByKey(ascending = false))
>   .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
> val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
> val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
> val kafkaParams: Map[String, Object] = 
> 

[jira] [Resolved] (SPARK-19361) kafka.maxRatePerPartition for compacted topic cause exception

2017-01-26 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger resolved SPARK-19361.

Resolution: Duplicate

> kafka.maxRatePerPartition for compacted topic cause exception
> -
>
> Key: SPARK-19361
> URL: https://issues.apache.org/jira/browse/SPARK-19361
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.6.1
>Reporter: Natalia Gorchakova
>
> creating DirectKafkaInputDStream with param 
> spark.streaming.kafka.maxRatePerPartition for compacted topic cause exception:
> ERROR [Executor task launch worker-2] executor.Executor: Exception in task 
> 1.0 in stage 2.0 (TID 22)
> java.lang.AssertionError: assertion failed: Got 3740923 > ending offset 
> 2428156 for topic COMPACTED.KAFKA.TOPIC partition 6 start 2228156. This 
> should not happen, and indicates a message may have been skipped
>   at scala.Predef$.assert(Predef.scala:179)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:217)
> as KafkaRDD expect maxOffset in batch <= startOffset + 
> maxRatePerPartition*secondsInBatch. While for compacted topic some offsets 
> can be missing. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19361) kafka.maxRatePerPartition for compacted topic cause exception

2017-01-26 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839683#comment-15839683
 ] 

Cody Koeninger commented on SPARK-19361:


Compacted topics in general don't work with direct stream, this isn't unique to 
maxRate.

There's an existing ticket on this,  
https://issues.apache.org/jira/browse/SPARK-17147 , with work in progress code. 
 If this feature is important to you, go help test it,

> kafka.maxRatePerPartition for compacted topic cause exception
> -
>
> Key: SPARK-19361
> URL: https://issues.apache.org/jira/browse/SPARK-19361
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.6.1
>Reporter: Natalia Gorchakova
>
> creating DirectKafkaInputDStream with param 
> spark.streaming.kafka.maxRatePerPartition for compacted topic cause exception:
> ERROR [Executor task launch worker-2] executor.Executor: Exception in task 
> 1.0 in stage 2.0 (TID 22)
> java.lang.AssertionError: assertion failed: Got 3740923 > ending offset 
> 2428156 for topic COMPACTED.KAFKA.TOPIC partition 6 start 2228156. This 
> should not happen, and indicates a message may have been skipped
>   at scala.Predef$.assert(Predef.scala:179)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:217)
> as KafkaRDD expect maxOffset in batch <= startOffset + 
> maxRatePerPartition*secondsInBatch. While for compacted topic some offsets 
> can be missing. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0

2017-01-25 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837844#comment-15837844
 ] 

Cody Koeninger commented on SPARK-18057:


If you can get commiter agreement on the outstanding question (whether this
can be a separate artifact) and are willing to test on your workload, I'm
happy to do the work.




> Update structured streaming kafka from 10.0.1 to 10.1.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19185) ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing

2017-01-17 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15826527#comment-15826527
 ] 

Cody Koeninger commented on SPARK-19185:


I'd expect setting cache capacity to zero to cause failures, but it's probably 
(slightly) faster to try than just patching the lines of code I pointed out.

In general, a single application using kafka consumers should not be reading 
from different places in the same topicpartition in different threads, because 
it breaks ordering guarantees.  That's an implication of Kafka semantics, not 
Spark semantics.  That's why the consumer cache exists the way it does.

Changing that behavior on a widespread basis is going to break ordering 
guarantees, which will break some people's existing jobs.  Hence my comments 
about arguing design decisions with committers.

> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -
>
> Key: SPARK-19185
> URL: https://issues.apache.org/jira/browse/SPARK-19185
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>Reporter: Kalvin Chau
>  Labels: streaming, windowing
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is 
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been 
> working through debugging this issue and after looking through some of the 
> spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using 
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker 
> threads assigned the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread 
> went first would seek and poll for all the records, and at the same time the 
> second thread would try to seek to its offset but fail because it is unable 
> to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: 
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting 
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block 
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in 
> task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>   at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

[jira] [Commented] (SPARK-19185) ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing

2017-01-15 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823164#comment-15823164
 ] 

Cody Koeninger commented on SPARK-19185:


This is a good error report, sorry it's taken me a while to get back to you on 
this.

My immediate suggestions to you as a workaround would be
- Try persist before windowing, so that batches of offsets from Kafka are only 
fetched once, rather than repeatedly and possibly simultaneously for a given 
kafka partition.  I'm assuming that's the underlying issue, but could be wrong.
- Failing that, KafkaRDD's constructor takes a boolean parameter indicating 
whether to use the consumer cache.  You can straightforwardly modify 
DirectKafkaInputDStream.compute to pass false.  This will require rebuilding 
only the kafka consumer jar, not redeploying all of spark.  This will be a 
performance hit, especially if you're using SSL, but is better than nothing.

Fixing this in the Spark master branch (either by allowing configuration of 
whether to use the consumer cache, or replacing the consumer cache with a pool 
of consumers with different group ids for the same topicpartition) is going to 
require getting the attention of a committer.  I don't really have the time to 
mess with that right now (happy to do the work, but zero interest in tracking 
down committers and arguing design decisions).

That being said, if one of the workarounds suggested above doesn't help you, 
let me know.


> ConcurrentModificationExceptions with CachedKafkaConsumers when Windowing
> -
>
> Key: SPARK-19185
> URL: https://issues.apache.org/jira/browse/SPARK-19185
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Spark 2.0.2
> Spark Streaming Kafka 010
> Mesos 0.28.0 - client mode
> spark.executor.cores 1
> spark.mesos.extra.cores 1
>Reporter: Kalvin Chau
>  Labels: streaming, windowing
>
> We've been running into ConcurrentModificationExcpetions "KafkaConsumer is 
> not safe for multi-threaded access" with the CachedKafkaConsumer. I've been 
> working through debugging this issue and after looking through some of the 
> spark source code I think this is a bug.
> Our set up is:
> Spark 2.0.2, running in Mesos 0.28.0-2 in client mode, using 
> Spark-Streaming-Kafka-010
> spark.executor.cores 1
> spark.mesos.extra.cores 1
> Batch interval: 10s, window interval: 180s, and slide interval: 30s
> We would see the exception when in one executor there are two task worker 
> threads assigned the same Topic+Partition, but a different set of offsets.
> They would both get the same CachedKafkaConsumer, and whichever task thread 
> went first would seek and poll for all the records, and at the same time the 
> second thread would try to seek to its offset but fail because it is unable 
> to acquire the lock.
> Time0 E0 Task0 - TopicPartition("abc", 0) X to Y
> Time0 E0 Task1 - TopicPartition("abc", 0) Y to Z
> Time1 E0 Task0 - Seeks and starts to poll
> Time1 E0 Task1 - Attempts to seek, but fails
> Here are some relevant logs:
> {code}
> 17/01/06 03:10:01 Executor task launch worker-1 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394204414 -> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO KafkaRDD: Computing 
> topic test-topic, partition 2 offsets 4394238058 -> 4394257712
> 17/01/06 03:10:01 Executor task launch worker-1 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394204414
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Get spark-executor-consumer test-topic 2 nextOffset 4394204414 requested 
> 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 INFO CachedKafkaConsumer: 
> Initial fetch for spark-executor-consumer test-topic 2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 DEBUG CachedKafkaConsumer: 
> Seeking to test-topic-2 4394238058
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Putting 
> block rdd_199_2 failed due to an exception
> 17/01/06 03:10:01 Executor task launch worker-0 WARN BlockManager: Block 
> rdd_199_2 could not be removed as it was not found on disk or in memory
> 17/01/06 03:10:01 Executor task launch worker-0 ERROR Executor: Exception in 
> task 49.0 in stage 45.0 (TID 3201)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>   at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>   at 
> 

[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2016-12-12 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742285#comment-15742285
 ] 

Cody Koeninger commented on SPARK-17147:


If compacted topics are important to you, then you should help test the branch 
listed above.

Yes, you can try increasing poll.ms

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17147) Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2016-12-07 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15729055#comment-15729055
 ] 

Cody Koeninger commented on SPARK-17147:


This ticket is about createDirectStream.  The question of whether it will be 
fixed is largely down to whether it's important enough to Sean or someone else 
to help test it thoroughly.

The stack trace you posted more than likely has nothing to do with this ticket, 
especially if you aren't using log compaction.  It's probably a network issue.  
Adjust spark.streaming.kafka.consumer.poll.ms, or do more investigation into 
what's going on with your network / Kafka.

> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets 
> (i.e. Log Compaction)
> --
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset 
> will always be just an increment of 1 above the previous offset. 
> I have worked around this problem by changing CachedKafkaConsumer to use the 
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the 
> streaming kafka consumer, and I'm happy to close this out if that's the case. 
> If, however, it is supposed to support non-consecutive offsets (e.g. due to 
> log compaction) I am also happy to contribute a PR.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18682) Batch Source for Kafka

2016-12-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720146#comment-15720146
 ] 

Cody Koeninger commented on SPARK-18682:


Isn't this a duplicate of https://issues.apache.org/jira/browse/SPARK-18386

Regarding limit, that would need to be a per partition limit, either explicitly 
or implicitly (divide n by number of partitions)

> Batch Source for Kafka
> --
>
> Key: SPARK-18682
> URL: https://issues.apache.org/jira/browse/SPARK-18682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Structured Streaming
>Reporter: Michael Armbrust
>
> Today, you can start a stream that reads from kafka.  However, given kafka's 
> configurable retention period, it seems like sometimes you might just want to 
> read all of the data that is available now.  As such we should add a version 
> that works with {{spark.read}} as well.
> The options should be the same as the streaming kafka source, with the 
> following differences:
>  - {{startingOffsets}} should default to earliest, and should not allow 
> {{latest}} (which would always be empty).
>  - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
> the same assign json format as {{startingOffsets}} should also be accepted.
> It would be really good, if things like {{.limit\(n\)}} were enough to 
> prevent all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-18682) Batch Source for Kafka

2016-12-04 Thread Cody Koeninger (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger updated SPARK-18682:
---
Comment: was deleted

(was: Isn't this a duplicate of 
https://issues.apache.org/jira/browse/SPARK-18386

Regarding limit, that would need to be a per partition limit, either explicitly 
or implicitly (divide n by number of partitions))

> Batch Source for Kafka
> --
>
> Key: SPARK-18682
> URL: https://issues.apache.org/jira/browse/SPARK-18682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Structured Streaming
>Reporter: Michael Armbrust
>
> Today, you can start a stream that reads from kafka.  However, given kafka's 
> configurable retention period, it seems like sometimes you might just want to 
> read all of the data that is available now.  As such we should add a version 
> that works with {{spark.read}} as well.
> The options should be the same as the streaming kafka source, with the 
> following differences:
>  - {{startingOffsets}} should default to earliest, and should not allow 
> {{latest}} (which would always be empty).
>  - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
> the same assign json format as {{startingOffsets}} should also be accepted.
> It would be really good, if things like {{.limit\(n\)}} were enough to 
> prevent all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18682) Batch Source for Kafka

2016-12-04 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720145#comment-15720145
 ] 

Cody Koeninger commented on SPARK-18682:


Isn't this a duplicate of https://issues.apache.org/jira/browse/SPARK-18386

Regarding limit, that would need to be a per partition limit, either explicitly 
or implicitly (divide n by number of partitions)

> Batch Source for Kafka
> --
>
> Key: SPARK-18682
> URL: https://issues.apache.org/jira/browse/SPARK-18682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Structured Streaming
>Reporter: Michael Armbrust
>
> Today, you can start a stream that reads from kafka.  However, given kafka's 
> configurable retention period, it seems like sometimes you might just want to 
> read all of the data that is available now.  As such we should add a version 
> that works with {{spark.read}} as well.
> The options should be the same as the streaming kafka source, with the 
> following differences:
>  - {{startingOffsets}} should default to earliest, and should not allow 
> {{latest}} (which would always be empty).
>  - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
> the same assign json format as {{startingOffsets}} should also be accepted.
> It would be really good, if things like {{.limit\(n\)}} were enough to 
> prevent all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-12-01 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712457#comment-15712457
 ] 

Cody Koeninger commented on SPARK-18506:


Yes, amazon linux.  No, not spark-ec2, just a spark tarball 
(spark-2.0.2-bin-hadoop2.7.tgz) unpacked on the master and workers.  Only 
setting changed was conf/slaves

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that 
> shows this behavior.  After spending 2 weeks trying all combinations with a 
> really stripped down driver, we think either there might be a 

[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706814#comment-15706814
 ] 

Cody Koeninger commented on SPARK-18475:


Glad you agree it shouldn't be enabled by default.

If you're in an organization where you are responsible for shit that other 
people broke, but have no power to actually fix it correctly...  I'm not sure 
there's anything useful I can say there.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-29 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706758#comment-15706758
 ] 

Cody Koeninger commented on SPARK-18475:


Burak hasn't empirically shown that it is of benefit for a properly 
partitioned, non-skewed kafka topic, especially if SSL is enabled (because of 
the effect on consumer caching).

Any output operation can tell the difference in ordering.

People are welcome to convince you that this is a worthwhile option, but there 
is no way it should be on by default.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-28 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15704372#comment-15704372
 ] 

Cody Koeninger commented on SPARK-18506:


1 x spark master is m3 medium
2 x spark workers are m3 xlarge
Looking back at that particular testing setup, kafka and ZK were on a single m3 
large, which is admittedly unrealistic.
I'm a little too busy at the moment to try again with a more realistic setup 
though.

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that 
> shows this behavior.  After spending 2 

[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15691386#comment-15691386
 ] 

Cody Koeninger commented on SPARK-18506:


I'm really confused by that - did you try a completely default install of
Kafka / spark?




> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that 
> shows this behavior.  After spending 2 weeks trying all combinations with a 
> really stripped down driver, we think either there might be a bug in the 
> kafka spark integration or if the kafka 0.10/spark upgrade 

[jira] [Commented] (SPARK-18525) Kafka DirectInputStream cannot be aware of new partition

2016-11-23 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15690915#comment-15690915
 ] 

Cody Koeninger commented on SPARK-18525:


Easiest thing to do is just restart your streaming job after you add partitions.

> Kafka DirectInputStream cannot be aware of new partition
> 
>
> Key: SPARK-18525
> URL: https://issues.apache.org/jira/browse/SPARK-18525
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.0.2
>Reporter: Zhiwen Sun
>
> It seems that DirectKafkaInputStream does not support read new partition when 
> spark streaming is running.
> Related spark code:
> https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101
> How to produce it:
> {code:title=KafkaDirectTest.scala|borderStyle=solid}
> object KafkaDirectTest {
>   def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("kafka direct test 5")
> conf.setIfMissing("spark.master", "local[3]")
> conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
> val ssc = new StreamingContext(conf, Seconds(1))
> val zkQuorum = Config("common").getString("kafka.zkquorum")
> val topic = "test_use"
> val groupId = "stream-test-0809"
> val kafkaParams = Map(
>   "metadata.broker.list" -> "dev-002:9092,dev-004:9092",
>   "group.id" -> groupId
> )
> val fromOffsets: Map[TopicAndPartition, Long] = Map(
>   new TopicAndPartition(topic, 0) -> 0L,
>   new TopicAndPartition(topic, 1) -> 0L,
>   new TopicAndPartition(topic, 2) -> 0L,
>   new TopicAndPartition(topic, 3) -> 0L
> )
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, Set(topic))
> lines.foreachRDD { rdd =>
>   rdd.foreach { row =>
> println(s"\n row: ${row} ")
>   }
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   offsetRanges.foreach { offset =>
> println(s"\n- offset: ${offset.topic} ${offset.partition} 
> ${offset.fromOffset} ${offset.untilOffset}")
>   }
> }
> ssc.start()
> ssc.awaitTermination()
>   }
> }
> {code}
> 1. start the job
> 2. add new partition of test_use topic
> The job cannot read new partition data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18525) Kafka DirectInputStream cannot be aware of new partition

2016-11-22 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686746#comment-15686746
 ] 

Cody Koeninger commented on SPARK-18525:


0.8 works only against defined partitions. Use 0.10 and subscribePattern

> Kafka DirectInputStream cannot be aware of new partition
> 
>
> Key: SPARK-18525
> URL: https://issues.apache.org/jira/browse/SPARK-18525
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.0.2
>Reporter: Zhiwen Sun
>
> It seems that DirectKafkaInputStream does not support read new partition when 
> spark streaming is running.
> Related spark code:
> https://github.com/apache/spark/blob/v2.0.2/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L101
> How to produce it:
> {code:title=KafkaDirectTest.scala|borderStyle=solid}
> object KafkaDirectTest {
>   def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("kafka direct test 5")
> conf.setIfMissing("spark.master", "local[3]")
> conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
> val ssc = new StreamingContext(conf, Seconds(1))
> val zkQuorum = Config("common").getString("kafka.zkquorum")
> val topic = "test_use"
> val groupId = "stream-test-0809"
> val kafkaParams = Map(
>   "metadata.broker.list" -> "dev-002:9092,dev-004:9092",
>   "group.id" -> groupId
> )
> val fromOffsets: Map[TopicAndPartition, Long] = Map(
>   new TopicAndPartition(topic, 0) -> 0L,
>   new TopicAndPartition(topic, 1) -> 0L,
>   new TopicAndPartition(topic, 2) -> 0L,
>   new TopicAndPartition(topic, 3) -> 0L
> )
> val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd
> val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, Set(topic))
> lines.foreachRDD { rdd =>
>   rdd.foreach { row =>
> println(s"\n row: ${row} ")
>   }
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   offsetRanges.foreach { offset =>
> println(s"\n- offset: ${offset.topic} ${offset.partition} 
> ${offset.fromOffset} ${offset.untilOffset}")
>   }
> }
> ssc.start()
> ssc.awaitTermination()
>   }
> }
> {code}
> 1. start the job
> 2. add new partition of test_use topic
> The job cannot read new partition data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-21 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15685190#comment-15685190
 ] 

Cody Koeninger commented on SPARK-18506:


I'd try to isolate aws vs gce as a possible cause before filing a bug against 
the kafka consumer.

I don't get paid to work on Spark, so most any time spent on it is during 
non-work hours regardless :)

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>  to broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse 
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>  from broker 10.102.20.13:9092 (id: 13 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that 
> shows this behavior.  After spending 2 weeks trying all combinations with a 
> really stripped down 

[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15682216#comment-15682216
 ] 

Cody Koeninger commented on SPARK-18506:


I tried your example code on an AWS 2-node spark standalone cluster, still not 
able to reproduce the issue.

[ec2-user@ip-10-0-2-58 spark-2.0.2-bin-hadoop2.7]$ ./bin/spark-submit --master 
spark://ip-10-0-2-58.ec2.internal:7077 --class example.SimpleKafkaLoggingDriver 
~/kafka-example-assembly-2.0.0.jar 10.0.2.96:9092 simple_logtest mygroup 
earliest

16/11/21 01:41:31 INFO JobScheduler: Added jobs for time 147969249 ms
simple_logtest 3 offsets: 0 to 62
simple_logtest 0 offsets: 0 to 61
simple_logtest 1 offsets: 0 to 62
simple_logtest 2 offsets: 0 to 61
simple_logtest 4 offsets: 0 to 62
16/11/21 01:41:31 INFO JobScheduler: Finished job streaming job 147969249 
ms.0 from job set of time 147969249 ms
16/11/21 01:41:31 INFO ReceivedBlockTracker: Deleting batches: 
16/11/21 01:41:31 INFO JobScheduler: Total delay: 1.946 s for time 
147969249 ms (execution: 0.009 s)
16/11/21 01:41:32 INFO InputInfoTracker: remove old batch metadata: 
simple_logtest 3 offsets: 62 to 62
simple_logtest 0 offsets: 61 to 61
simple_logtest 1 offsets: 62 to 62
simple_logtest 2 offsets: 61 to 61
simple_logtest 4 offsets: 62 to 62
16/11/21 01:41:35 INFO JobScheduler: Starting job streaming job 1479692495000 
ms.0 from job set of time 1479692495000 ms

What happens when you use ConsumerStrategies.Assign to start at 0 for the 
partitions in question?

> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a 
> single partition on a multi partition topic
> ---
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark 
> standalone mode 2.0.2 
> with Kafka 0.10.1.0.   
>Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka 
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our 
> drivers to read all partitions of a single stream when kafka 
> auto.offset.reset=earliest running on a real cluster(separate VM nodes). 
> When we run our drivers with auto.offset.reset=latest ingesting from a single 
> kafka topic with multiple partitions (usually 10 but problem shows up  with 
> only 3 partitions), the driver reads correctly from all partitions.  
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using 
> spark-streaming-kafka-0-8_2.11 with the prior setting 
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem 
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster" 
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)  
> instead of YARN 2.7.3. Single partition read problem persists both cases. 
> Please note this problem occurs on an actual cluster of separate VM nodes 
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and 
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with 
> both.
> 7. Tried the simplest scala driver that only logs.  (Our team uses java.) 
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly 
> overprovisioned for cores and memory. Also tried ramping up executors and 
> cores.  Since driver works with auto.offset.reset=latest, we have ruled out 
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to 
> different offset configuration even though the consumer config correctly 
> indicates auto.offset.reset=earliest. 
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest 
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>  to broker 10.102.20.12:9092 (id: 12 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest 
> 

[jira] [Commented] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2016-11-20 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15682171#comment-15682171
 ] 

Cody Koeninger commented on SPARK-18475:


An iterator certainly does have an ordering guarantee, and it's pretty 
straightforward to figure out whether a given operation shuffles.  Plenty of 
jobs have been written depending on that ordering guarantee, and it's 
documented for the Direct Stream.

The only reason it's a significant performance improvement is because the OP is 
mis-using kafka.  If he had reasonably even production into a reasonable number 
of partitions, there would be no performance improvement.

You guys might be able to convince Michael this is a good idea, but as I said, 
this isn't the first time this has come up, and my answer isn't likely to 
change.  I'm not "blocking" anything, I'm not a gatekeeper and have no more 
rights than you do.  I just think it's a really bad idea.

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   4   5   >