[jira] [Commented] (SPARK-19677) HDFSBackedStateStoreProvider fails to overwrite existing file
[ https://issues.apache.org/jira/browse/SPARK-19677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876747#comment-15876747 ] Heji Kim commented on SPARK-19677: -- Thank you for reporting this issue! I just wanted to add that we get the same HDFS error when we restart our structured streaming drivers but also when we try to run more complex driver using withWatermark/agg/groupBy/orderBy, we get in the first run without restart: java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id = (op=0, part=15), dir = /user/spark/checkpoints/StructuredStreamingSignalAggregation/ss_StructuredStreamingSignalAggregation/state/0/15] at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:138) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323). > HDFSBackedStateStoreProvider fails to overwrite existing file > - > > Key: SPARK-19677 > URL: https://issues.apache.org/jira/browse/SPARK-19677 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Roberto Agostino Vitillo >Priority: Critical > > I got the exception below after restarting a crashed Structured Streaming > application. This seems to be due to the fact that > {{/tmp/checkpoint/state/0/0/214451.delta}} already exists in HDFS. > {code} > 17/02/20 14:14:26 ERROR StreamExecution: Query [id = > 5023231c-2433-4013-a8b9-d54bb5751445, runId = > 4168cf31-7d0b-4435-9b58-28919abd937b] terminated with error > org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:147) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:78) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$cla
[jira] [Resolved] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.0.1 is used
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim resolved SPARK-18506. -- Resolution: Not A Problem Just another library incompatibilty issue. We just downgraded the kafka-clients to 10.0.1.0 from 10.1.0.0 > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic when kafka-clients 0.10.0.1 is > used > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the completely stripped down trivial test driver that > shows this behavior. After spending 2 weeks trying all combinations with a > really stripped down driver, we think either there might be a bug in t
[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.1.0 is used
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-18506: - Summary: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.1.0 is used (was: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.0.1 is used) > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic when kafka-clients 0.10.1.0 is > used > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the compl
[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713476#comment-15713476 ] Heji Kim commented on SPARK-18506: -- Breaking news I finally found the source of the problem. Our driver jars have a lot of dependencies and we also include the kafka-clients jar along with spark-streaming_2.11 (2.02). Our data architect says our code uses it. org.apache.kafka kafka-clients 0.10.1.0 If I downgrade kafka-clients 0.10.0.1, "earliest" works exactly as expected. (I'll update the issue name with this jar name...) > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_te
[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.0.1 is used
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-18506: - Summary: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic when kafka-clients 0.10.0.1 is used (was: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic) > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic when kafka-clients 0.10.0.1 is > used > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the completely stripped down trivial test driv
[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709430#comment-15709430 ] Heji Kim commented on SPARK-18506: -- Hi Cody. I have tried roughly a similar configuration on GCP with ZK/broker on a single instance and I still have this single partition issue. I have looked at every single config setting. Which OS are you on? Is it Amazon Linux? Are you using the spark-ec2 script? I will try to retest closer to your setting. Heji > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the completely stripped down trivial
[jira] [Reopened] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim reopened SPARK-18506: -- My team has asked me to reopen this issue to see if there can be any more progress. We have implemented a workaround for some of our drivers so we manually manage all our checkpointing and partition assignment in Cassandra. However we would like to use the out of the box "earliest" offset and checkpointing for simpler use cases and comparison with performance testing. > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the completely stripped down trivial test
[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15703328#comment-15703328 ] Heji Kim commented on SPARK-18506: -- Hi Cody, I am putting one last ditch effort into getting this to work. Could you send me more details about your test setup. Spark cluster- exact number of ec2 instances with instance type? Is three machines- one separate master and 2 separate nodes? Kafka cluster- exact number of ec2 instances with instance type? Thanks, Heji > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 > (org.apache.kafka.clients.consumer.internals.Fetcher) > {noformat} > I've enclosed below the completely stripped dow
[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15691582#comment-15691582 ] Heji Kim commented on SPARK-18506: -- My ansible automation just downloads the official distributions and unzips them and changes the minimal set of configuration defining the cluster (1 master 3 slaves each). I did use the exact same ansible plays for both AWS/GCE on Debian 8. Either there is something wrong with our cluster configuration or there is a bug somewhere. I thought the Kafka 0.10.1/YARN/Spark cluster was fine since we are able to run our legacy driver(0.8 consumer) on it. Also I can run the Kafka shipped performance tests without problems. On Monday I'm going to go through each server/slave/consumer setting again with my Spark guy but we have grappled with this mystery for almost 3 weeks now and am ready to just give up on "earliest" since Assign works. Next month we have start migrating the entire stack to Kubernetes so we will see if we run into this issue again. Have a fine weekend. Heji > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received Li
[jira] [Resolved] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim resolved SPARK-18506. -- Resolution: Workaround I was able to replicate the single partition only problem in AWS with both YARN and spark standalone. Disappointingly the AWS fails identical to our GCE setup. Our base kafka/YARN/standalone configuration is fairly vanilla (fully automated with ansible) without too much out of the box configuration so there must be some configuration we need in Kafka 0.10.1 that we did not need in 0.9. Since the ConsumerStrategy.Assign works and we will have to implement our own checkpoint/offset management (for exactly once semantics and also replay functionality), we will avoid the "earliest" offset for now. Thank you for all your work and happy Thanksgiving. Heji > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 DEBUG Fet
[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15684995#comment-15684995 ] Heji Kim commented on SPARK-18506: -- Just confirming that when I use ConsumerStrategy.Assign with all partitions starting at 0, everything works as expected. [2016-11-21 22:46:15,016] INFO OFFSET: null KafkaRDD[4] at createDirectStream at SimpleKafkaLoggingDriverAssignedOffset.java:62 (com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset) [2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 3 1169577 1174129 (com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset) [2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 0 1169615 1174109 (com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset) [2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 1 1169561 1174125 (com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset) [2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 2 1169567 1174132 (com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset) [2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 4 1169628 1174202 (com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset) Code below. package com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka; import java.util.*; import org.apache.commons.collections.map.HashedMap; import org.apache.kafka.common.TopicPartition; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.kafka010.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.HasOffsetRanges; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; /* Simplest possible java driver that assigns starting offset */ public final class SimpleKafkaLoggingDriverAssignedOffset { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaLoggingDriverAssignedOffset.class); public static final String APP_NAME = "TEST_ASSIGNED_OFFSET"; public static void main(final String[] args) throws Exception { if (args.length != 4) { throw new IllegalArgumentException("Driver passed in incorrect parameters" + "Usage: SimpleKafkaLoggingDriverAssignedOffset "); } String kafka_topic = args[1]; Map kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers",args[0]); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", args[2]); // kafkaParams.put("auto.offset.reset",args[3]); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList(kafka_topic); final SparkConf sparkConfiguration = new SparkConf().setAppName(APP_NAME+"_"+args[1]); int totalPartitions= Integer.valueOf(args[3]); // create the streaming context final JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConfiguration, Durations.seconds(Integer.valueOf(args[3]))); // force log streamingContext.ssc().sc().setLogLevel("DEBUG"); // assign fixed topic partitions starting at 0 final Map partitionStart=new HashedMap(); for (int i=0; i> directStream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), fixedAssignment ); directStream.foreachRDD(rdd -> { LOGGER.info("OFFSET: " + rdd.rdd().name() + " " + rdd.toString()); for (final OffsetRange offset : ((HasOffsetRanges) rdd.rdd()).offsetRanges()) { LOGGER.info("OFFSET: " + offset.topic() + ' ' + offset.partition() + ' ' + offset.fromOffset() + ' ' + offset.untilOffset()); } }); // Start the streaming context and await termination LOGGER.info("KCP: starting SimpleKafkaLoggingDriverAssignedOffset Driver with master URL >{}<", streamingContext.sparkContext().master()); streamingContext.start(); LOGGER.info("KCP: spark state: {}", streamingContext.getState().name());
[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15684844#comment-15684844 ] Heji Kim commented on SPARK-18506: -- Firstly thank you Cody for the quick response. Our intention was not to make anyone work on a weekend and I will remember not to report bugs on a Friday. I am the devops engineer doing the deployment automation but our spark java guy Michael did create a driver that custom managed the kafka offsets in Cassandra to see if it would fix the problem. It appeared not to fix the issue. He is on vacation for a week so it will take some time for me to get to get a ConsumerStrategy.Assign working. Also I will try to get clusters running on AWS this week. If you think I should report a bug against Kafka consumer, I can do as well. Thanks again, Heji > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} > from broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher)
[jira] [Commented] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15679667#comment-15679667 ] Heji Kim commented on SPARK-18506: -- We have spent two weeks trying different configurations and stripping everything down. The only thing we have not tried is a different cloud provider- we are using GCE. Since previous versions work properly as does the "latest" offset setting, we did not think the problem was in the infrastructure layer. Where does Databricks do the spark cluster regression testing? I thought it might be AWS? If you have a working example of multiple partitions that has been tested on an actual cluster that you use for regression testing, we would be grateful for any pointers. We have upgraded our drivers since Spark 1.2 (partly on AWS, and GCP/GCE since 1.6) and this is the first time we have had such a blocker.) I do want the Spark team to know that our team tried our absolute best to verify that there was nothing wrong with our system configuration and have spent more than 100+ hours before posting this issue. > kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a > single partition on a multi partition topic > --- > > Key: SPARK-18506 > URL: https://issues.apache.org/jira/browse/SPARK-18506 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark > standalone mode 2.0.2 > with Kafka 0.10.1.0. >Reporter: Heji Kim > > Our team is trying to upgrade to Spark 2.0.2/Kafka > 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our > drivers to read all partitions of a single stream when kafka > auto.offset.reset=earliest running on a real cluster(separate VM nodes). > When we run our drivers with auto.offset.reset=latest ingesting from a single > kafka topic with multiple partitions (usually 10 but problem shows up with > only 3 partitions), the driver reads correctly from all partitions. > Unfortunately, we need "earliest" for exactly once semantics. > In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using > spark-streaming-kafka-0-8_2.11 with the prior setting > auto.offset.reset=smallest runs correctly. > We have tried the following configurations in trying to isolate our problem > but it is only auto.offset.reset=earliest on a "real multi-machine cluster" > which causes this problem. > 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) > instead of YARN 2.7.3. Single partition read problem persists both cases. > Please note this problem occurs on an actual cluster of separate VM nodes > (but not when our engineer runs in as a cluster on his own Mac.) > 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. > 3. Turned off checkpointing. Problem persists with or without checkpointing. > 4. Turned off backpressure. Problem persists with or without backpressure. > 5. Tried both partition.assignment.strategy RangeAssignor and > RoundRobinAssignor. Broken with both. > 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with > both. > 7. Tried the simplest scala driver that only logs. (Our team uses java.) > Broken with both. > 8. Tried increasing GCE capacity for cluster but already we were highly > overprovisioned for cores and memory. Also tried ramping up executors and > cores. Since driver works with auto.offset.reset=latest, we have ruled out > GCP cloud infrastructure issues. > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > {noformat} > 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} > to broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 9 TRACE Sending ListOffsetRequest > {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} > to broker 10.102.20.13:9092 (id: 13 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > 8 TRACE Received ListOffsetResponse > {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} > from broker 10.102.20.12:9092 (id: 12 rack: null) > (org.apache.kafka.clients.consumer.int
[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-18506: - Description: Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our drivers to read all partitions of a single stream when kafka auto.offset.reset=earliest running on a real cluster(separate VM nodes). When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with multiple partitions (usually 10 but problem shows up with only 3 partitions), the driver reads correctly from all partitions. Unfortunately, we need "earliest" for exactly once semantics. In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using spark-streaming-kafka-0-8_2.11 with the prior setting auto.offset.reset=smallest runs correctly. We have tried the following configurations in trying to isolate our problem but it is only auto.offset.reset=earliest on a "real multi-machine cluster" which causes this problem. 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) instead of YARN 2.7.3. Single partition read problem persists both cases. Please note this problem occurs on an actual cluster of separate VM nodes (but not when our engineer runs in as a cluster on his own Mac.) 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. 3. Turned off checkpointing. Problem persists with or without checkpointing. 4. Turned off backpressure. Problem persists with or without backpressure. 5. Tried both partition.assignment.strategy RangeAssignor and RoundRobinAssignor. Broken with both. 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with both. 7. Tried the simplest scala driver that only logs. (Our team uses java.) Broken with both. 8. Tried increasing GCE capacity for cluster but already we were highly overprovisioned for cores and memory. Also tried ramping up executors and cores. Since driver works with auto.offset.reset=latest, we have ruled out GCP cloud infrastructure issues. When we turn on the debug logs, we sometimes see partitions being set to different offset configuration even though the consumer config correctly indicates auto.offset.reset=earliest. {noformat} 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} to broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} to broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} from broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} from broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 (org.apache.kafka.clients.consumer.internals.Fetcher) {noformat} I've enclosed below the completely stripped down trivial test driver that shows this behavior. After spending 2 weeks trying all combinations with a really stripped down driver, we think either there might be a bug in the kafka spark integration or if the kafka 0.10/spark upgrade needs special configuration, it should be fantastic if it was clearer in the documentation. But currently we cannot upgrade. {code} package com.x.labs.analytics.diagnostics.spark.drivers import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies /** * * This driver is only for pulling data from the stream and logging to output just to isolate single partition bug */ object SimpleKafkaLoggingDriver { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: SimpleTestDriver ") System.exit(1) } val Array(brokers, topic, groupId, offsetReset
[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-18506: - Description: Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our drivers to read all partitions of a single stream when kafka auto.offset.reset=earliest. When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with multiple partitions (usually 10 but problem shows up with only 3 partitions), the driver reads correctly from all partitions. Unfortunately, we need "earliest" for exactly once semantics. In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using spark-streaming-kafka-0-8_2.11 with the prior setting auto.offset.reset=smallest runs correctly. We have tried the following configurations in trying to isolate our problem but it is only auto.offset.reset=earliest on a "real multi-machine cluster" which causes this problem. 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each) instead of YARN 2.7.3. Single partition read problem persists both cases. Please note this problem occurs on an actual cluster of separate VM nodes (but not when our engineer runs in as a cluster on his own Mac.) 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. 3. Turned off checkpointing. Problem persists with or without checkpointing. 4. Turned off backpressure. Problem persists with or without backpressure. 5. Tried both partition.assignment.strategy RangeAssignor and RoundRobinAssignor. Broken with both. 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with both. 7. Tried the simplest scala driver that only logs. (Our team uses java.) Broken with both. 8. Tried increasing GCE capacity for cluster but already we were highly overprovisioned for cores and memory. Also tried ramping up executors and cores. Since driver works with auto.offset.reset=latest, we have ruled out GCP cloud infrastructure issues. When we turn on the debug logs, we sometimes see partitions being set to different offset configuration even though the consumer config correctly indicates auto.offset.reset=earliest. {noformat} 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} to broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} to broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} from broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} from broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 (org.apache.kafka.clients.consumer.internals.Fetcher) {noformat} I've enclosed below the completely stripped down trivial test driver that shows this behavior. After spending 2 weeks trying all combinations with a really stripped down driver, we think either there might be a bug in the kafka spark integration or if the kafka 0.10/spark upgrade needs special configuration, it should be fantastic if it was clearer in the documentation. But currently we cannot upgrade. {code} package com.x.labs.analytics.diagnostics.spark.drivers import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies /** * * This driver is only for pulling data from the stream and logging to output just to isolate single partition bug */ object SimpleKafkaLoggingDriver { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: SimpleTestDriver ") System.exit(1) } val Array(brokers, topic, groupId, offsetReset) = args val preferredHosts = LocationStr
[jira] [Updated] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
[ https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-18506: - Description: Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our drivers to read all partitions of a single stream when kafka auto.offset.reset=earliest. When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with multiple partitions (usually 10 but problem shows up with only 3 partitions), the driver reads correctly from all partitions. Unfortunately, we need "earliest" for exactly once semantics. In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using spark-streaming-kafka-0-8_2.11 with the prior setting auto.offset.reset=smallest runs correctly. We have tried the following configurations in trying to isolate our problem but it is only auto.offset.reset=earliest which causes this problem. 1. Ran with spark standalone cluster instead of YARN 2.7.3. Single partition read problem persists both cases. 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. 3. Turned off checkpointing. Problem persists with or without checkpointing. 4. Turned off backpressure. Problem persists with or without backpressure. 5. Tried both partition.assignment.strategy RangeAssignor and RoundRobinAssignor. Broken with both. 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with both. 7. Tried the simplest scala driver that only logs. (Our team uses java.) Broken with both. 8. Tried increasing GCE capacity for cluster but already we were highly overprovisioned for cores and memory. Also tried ramping up executors and cores. Since driver works with auto.offset.reset=latest, we have ruled out GCP cloud infrastructure issues. When we turn on the debug logs, we sometimes see partitions being set to different offset configuration even though the consumer config correctly indicates auto.offset.reset=earliest. {noformat} 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} to broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} to broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} from broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} from broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 (org.apache.kafka.clients.consumer.internals.Fetcher) {noformat} I've enclosed below the completely stripped down trivial test driver that shows this behavior. After spending 2 weeks trying all combinations with a really stripped down driver, we think either there might be a bug in the kafka spark integration or if the kafka 0.10/spark upgrade needs special configuration, it should be fantastic if it was clearer in the documentation. But currently we cannot upgrade. {code} package com.x.labs.analytics.diagnostics.spark.drivers import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies /** * * This driver is only for pulling data from the stream and logging to output just to isolate single partition bug */ object SimpleKafkaLoggingDriver { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: SimpleTestDriver ") System.exit(1) } val Array(brokers, topic, groupId, offsetReset) = args val preferredHosts = LocationStrategies.PreferConsistent val topics = List(topic) val kafkaParams = Map( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classO
[jira] [Created] (SPARK-18506) kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic
Heji Kim created SPARK-18506: Summary: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic Key: SPARK-18506 URL: https://issues.apache.org/jira/browse/SPARK-18506 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 2.0.2 Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark standalone mode 2.0.2 with Kafka 0.10.1.0. Reporter: Heji Kim Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our drivers to read all partitions of a single stream when kafka auto.offset.reset=earliest. When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with multiple partitions (usually 10 but problem shows up with only 3 partitions), the driver reads correctly from all partitions. Unfortunately, we need "earliest" for exactly once semantics. In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using spark-streaming-kafka-0-8_2.11 with the prior setting auto.offset.reset=smallest runs correctly. We have tried the following configurations in trying to isolate our problem but it is only auto.offset.reset=earliest which causes this problem. 1. Ran with spark standalone cluster instead of YARN 2.7.3. Single partition read problem persists both cases. 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists. 3. Turned off checkpointing. Problem persists with or without checkpointing. 4. Turned off backpressure. Problem persists with or without backpressure. 5. Tried both partition.assignment.strategy RangeAssignor and RoundRobinAssignor. Broken with both. 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with both. 7. Tried the simplest scala driver that only logs. (Our team uses java.) Broken with both. 8. Tried increasing GCE capacity for cluster but already we were highly overprovisioned for cores and memory. Also tried ramping up executors and cores. Since driver works with auto.offset.reset=latest, we have ruled out GCP cloud infrastructure issues. When we turn on the debug logs, we sometimes see partitions being set to different offset configuration even though the consumer config correctly indicates auto.offset.reset=earliest. {noformat} 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} to broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Sending ListOffsetRequest {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} to broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} from broker 10.102.20.12:9092 (id: 12 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 9 TRACE Received ListOffsetResponse {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} from broker 10.102.20.13:9092 (id: 13 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 (org.apache.kafka.clients.consumer.internals.Fetcher) 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 (org.apache.kafka.clients.consumer.internals.Fetcher) {noformat} I've enclosed below the completely stripped down trivial test driver that shows this behavior. Any insight would be greatly appreciated. {code} package com.x.labs.analytics.diagnostics.spark.drivers import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies /** * * This driver is only for pulling data from the stream and logging to output just to isolate single partition bug */ object SimpleKafkaLoggingDriver { def main(args: Array[String]) { if (args.length != 4) { System.err.println("Usage: SimpleTestDriver ") System.exit(1) } val Array(brokers, topic, groupId, offsetReset) = args val preferredHosts = LocationStrategies.PreferConsistent val topics = List(topic) val kafkaParams = Map( "bootstrap.servers" -> br
[jira] [Closed] (SPARK-11613) Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection setting
[ https://issues.apache.org/jira/browse/SPARK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim closed SPARK-11613. Resolution: Fixed > Kinesis ASL should allow caller to set ClientConfiguration for socket > timeouts and other connection setting > --- > > Key: SPARK-11613 > URL: https://issues.apache.org/jira/browse/SPARK-11613 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 1.5.1 >Reporter: Heji Kim > > Currently the Kinesis ASL does not allow configuration of Kinesis settings > such as maxRecords fetched per getRecords or the connection settings on > AmazonKinesisClient such as the socket timeout. > Currently we get intermittent multiple SocketTimeoutExceptions on restart > from Kinesis which holds up the driver processing(2-10 minutes) and introduce > a pileup. After we engaged AWS support, they asked us to lower the > maxRecords per getRecords to 1,000- current default is 10,000. There appeared > no way to configure this externally so I made a custom build and this > resolved some of our issues. > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11613) Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection setting
[ https://issues.apache.org/jira/browse/SPARK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-11613: - Description: Currently the Kinesis ASL does not allow configuration of Kinesis settings such as maxRecords fetched per getRecords or the connection settings on AmazonKinesisClient such as the socket timeout. Currently we get intermittent multiple SocketTimeoutExceptions on restart from Kinesis which holds up the driver processing(2-10 minutes) and introduce a pileup. After we engaged AWS support, they asked us to lower the maxRecords per getRecords to 1,000- current default is 10,000. There appeared no way to configure this externally so I made a custom build and this resolved some of our issues. was: Currently the Kinesis ASL does not allow configuration of connection settings on AmazonKinesisClient such as the socket timeout or the maxRecords fetched per getRecords. Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis which holds up the driver processing and pile up batch processing. We'd like to set it to something closer to the batch polling time of 5 seconds. The requests are normally subsecond so we'd rather fail fast. KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential based API to create AmazonKinesisClient and so we don't have access to com.amazonaws.ClientConfiguration which is the only API AWS provides for setting connection settings. Summary: Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection setting (was: Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection settings) > Kinesis ASL should allow caller to set ClientConfiguration for socket > timeouts and other connection setting > --- > > Key: SPARK-11613 > URL: https://issues.apache.org/jira/browse/SPARK-11613 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1 >Reporter: Heji Kim > > Currently the Kinesis ASL does not allow configuration of Kinesis settings > such as maxRecords fetched per getRecords or the connection settings on > AmazonKinesisClient such as the socket timeout. > Currently we get intermittent multiple SocketTimeoutExceptions on restart > from Kinesis which holds up the driver processing(2-10 minutes) and introduce > a pileup. After we engaged AWS support, they asked us to lower the > maxRecords per getRecords to 1,000- current default is 10,000. There appeared > no way to configure this externally so I made a custom build and this > resolved some of our issues. > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11613) Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection settings
[ https://issues.apache.org/jira/browse/SPARK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-11613: - Description: Currently the Kinesis ASL does not allow configuration of connection settings on AmazonKinesisClient such as the socket timeout or the maxRecords fetched per getRecords. Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis which holds up the driver processing and pile up batch processing. We'd like to set it to something closer to the batch polling time of 5 seconds. The requests are normally subsecond so we'd rather fail fast. KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential based API to create AmazonKinesisClient and so we don't have access to com.amazonaws.ClientConfiguration which is the only API AWS provides for setting connection settings. was: Currently the Kinesis ASL does not allow configuration of connection settings on AmazonKinesisClient such as the socket timeout. Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis which holds up the driver processing and pile up batch processing. We'd like to set it to something closer to the batch polling time of 5 seconds. The requests are normally subsecond so we'd rather fail fast. KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential based API to create AmazonKinesisClient and so we don't have access to com.amazonaws.ClientConfiguration which is the only API AWS provides for setting connection settings. > Kinesis ASL should allow caller to set ClientConfiguration for socket > timeouts and other connection settings > > > Key: SPARK-11613 > URL: https://issues.apache.org/jira/browse/SPARK-11613 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1 >Reporter: Heji Kim > > Currently the Kinesis ASL does not allow configuration of connection settings > on AmazonKinesisClient such as the socket timeout or the maxRecords fetched > per getRecords. > Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis > which holds up the driver processing and pile up batch processing. > We'd like to set it to something closer to the batch polling time of 5 > seconds. The requests are normally subsecond so we'd rather fail fast. > KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential > based API to create AmazonKinesisClient and so we don't have access to > com.amazonaws.ClientConfiguration which is the only API AWS provides for > setting connection settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11613) Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection settings
[ https://issues.apache.org/jira/browse/SPARK-11613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heji Kim updated SPARK-11613: - Description: Currently the Kinesis ASL does not allow configuration of connection settings on AmazonKinesisClient such as the socket timeout. Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis which holds up the driver processing and pile up batch processing. We'd like to set it to something closer to the batch polling time of 5 seconds. The requests are normally subsecond so we'd rather fail fast. KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential based API to create AmazonKinesisClient and so we don't have access to com.amazonaws.ClientConfiguration which is the only API AWS provides for setting connection settings. was: Currently Kinesis ASL does not allow configuration of connection settings on AmazonKinesisClient such as socket timeout. Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis which can hold up the entire driver processing and pile up batch processing. We'd like to set it to something closer to the batch polling time of 5 seconds. The requests are normally subsecond so we'd rather fail fast. KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential based API to create AmazonKinesisClient and so we don't have access to com.amazonaws.ClientConfiguration which is the only API AWS provides for setting connection settings. > Kinesis ASL should allow caller to set ClientConfiguration for socket > timeouts and other connection settings > > > Key: SPARK-11613 > URL: https://issues.apache.org/jira/browse/SPARK-11613 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1 >Reporter: Heji Kim > > Currently the Kinesis ASL does not allow configuration of connection settings > on AmazonKinesisClient such as the socket timeout. > Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis > which holds up the driver processing and pile up batch processing. We'd like > to set it to something closer to the batch polling time of 5 seconds. The > requests are normally subsecond so we'd rather fail fast. > KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential > based API to create AmazonKinesisClient and so we don't have access to > com.amazonaws.ClientConfiguration which is the only API AWS provides for > setting connection settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11613) Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection settings
Heji Kim created SPARK-11613: Summary: Kinesis ASL should allow caller to set ClientConfiguration for socket timeouts and other connection settings Key: SPARK-11613 URL: https://issues.apache.org/jira/browse/SPARK-11613 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 1.5.1 Reporter: Heji Kim Currently Kinesis ASL does not allow configuration of connection settings on AmazonKinesisClient such as socket timeout. Currently we get intermittent 2 minute SocketTimeoutExceptions from Kinesis which can hold up the entire driver processing and pile up batch processing. We'd like to set it to something closer to the batch polling time of 5 seconds. The requests are normally subsecond so we'd rather fail fast. KinesisBackedBlockRDD.KinesisSequenceRangeIterator uses only the credential based API to create AmazonKinesisClient and so we don't have access to com.amazonaws.ClientConfiguration which is the only API AWS provides for setting connection settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8939) YARN EC2 default setting fails with IllegalArgumentException
[ https://issues.apache.org/jira/browse/SPARK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739578#comment-14739578 ] Heji Kim edited comment on SPARK-8939 at 9/15/15 3:14 AM: -- I was trying to upgrade to 1.5 today and could not submit drivers due to this same error: "Unknown/unsupported param List(--num-executors, 6)" But then I spun up another cluster and the drivers submitted without error... was (Author: hster): I was trying to upgrade to 1.5 today and could not submit drivers due to this error. Please note that I get this error even when I specify the num-executors. If I use either --num-executors 6 or --conf spark.executor.instances=6 I will get the following error message: "Unknown/unsupported param List(--num-executors, 6)" This is preventing us from upgrading to 1.5 unless there is another workaround. > YARN EC2 default setting fails with IllegalArgumentException > > > Key: SPARK-8939 > URL: https://issues.apache.org/jira/browse/SPARK-8939 > Project: Spark > Issue Type: Bug > Components: EC2 >Affects Versions: 1.5.0 >Reporter: Andrew Or > > I just set it up from scratch using the spark-ec2 script. Then I ran > {code} > bin/spark-shell --master yarn > {code} > which failed with > {code} > 15/07/09 03:44:29 ERROR SparkContext: Error initializing SparkContext. > java.lang.IllegalArgumentException: Unknown/unsupported param > List(--num-executors, , --executor-memory, 6154m, --executor-memory, 6154m, > --executor-cores, 2, --name, Spark shell) > {code} > This goes away if I provide `--num-executors`, but we should fix the default. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8939) YARN EC2 default setting fails with IllegalArgumentException
[ https://issues.apache.org/jira/browse/SPARK-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739578#comment-14739578 ] Heji Kim commented on SPARK-8939: - I was trying to upgrade to 1.5 today and could not submit drivers due to this error. Please note that I get this error even when I specify the num-executors. If I use either --num-executors 6 or --conf spark.executor.instances=6 I will get the following error message: "Unknown/unsupported param List(--num-executors, 6)" This is preventing us from upgrading to 1.5 unless there is another workaround. > YARN EC2 default setting fails with IllegalArgumentException > > > Key: SPARK-8939 > URL: https://issues.apache.org/jira/browse/SPARK-8939 > Project: Spark > Issue Type: Bug > Components: EC2 >Affects Versions: 1.5.0 >Reporter: Andrew Or > > I just set it up from scratch using the spark-ec2 script. Then I ran > {code} > bin/spark-shell --master yarn > {code} > which failed with > {code} > 15/07/09 03:44:29 ERROR SparkContext: Error initializing SparkContext. > java.lang.IllegalArgumentException: Unknown/unsupported param > List(--num-executors, , --executor-memory, 6154m, --executor-memory, 6154m, > --executor-cores, 2, --name, Spark shell) > {code} > This goes away if I provide `--num-executors`, but we should fix the default. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org