[jira] [Created] (SPARK-44346) Python worker exited unexpectedly - java.io.EOFException on DataInputStream.readInt - cluster doesn't terminate
Dmitry Goldenberg created SPARK-44346: - Summary: Python worker exited unexpectedly - java.io.EOFException on DataInputStream.readInt - cluster doesn't terminate Key: SPARK-44346 URL: https://issues.apache.org/jira/browse/SPARK-44346 Project: Spark Issue Type: Question Components: Spark Core Affects Versions: 3.3.2 Environment: AWS EMR emr-6.11.0 Spark 3.3.2 pandas 1.3.5 pyarrow 12.0.0 "spark.sql.shuffle.partitions": "210", "spark.default.parallelism": "210", "spark.yarn.stagingDir": "hdfs:///tmp", "spark.sql.adaptive.enabled": "true", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.execution.arrow.pyspark.enabled": "true", "spark.dynamicAllocation.enabled": "false", "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", Reporter: Dmitry Goldenberg I am getting the below exception as a WARN. Apparently, a worker crashes. Multiple issues here: - What is the cause of the crash? Is it something to do with pyarrow; some kind of a versioning mismatch? - Error handling in Spark. The error is too low-level to make sense of. Can it be caught in Spark and dealth with properly? - The cluster doesn't recover or cleanly terminate. It essentially just hangs. EMR doesn't terminate it either. Stack traces: ``` 23/07/05 22:43:47 WARN TaskSetManager: Lost task 1.0 in stage 81.0 (TID 2761) (ip-10-2-250-114.awsinternal.audiomack.com executor 2): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:592) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:574) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:763) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.execution.AbstractUnsafeExternalRowSorter.sort(AbstractUnsafeExternalRowSorter.java:50) at org.apache.spark.sql.execution.SortExecBase.$anonfun$doExecute$1(SortExec.scala:346) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:748) ... 26 more 23/07/05 22:43:47 INFO TaskSetManager: Starting task 1.1 in stage 81.0 (TID 2763) (ip-10-2-250-114.awsinternal.audiomack.com, executor 2, partition 1, NODE_LOCAL, 5020 bytes) taskResourceAssignments Map() 23/07/05 23:30:17 INFO TaskSetManager: Finished task 2.0 in stage 81.0 (TID 2762) in 8603522 ms on ip-10-2-250-114.awsinternal.audiomack.com (executor 2) (1/3) 23/07/05 23:39:09 INFO TaskSetManager: Finished task 0.0 in stage 81.0 (TID 2760) in 9135125 ms on ip-10-2-250-114.awsinternal.audiomack.com (executor 2) (2/3)
[jira] [Commented] (SPARK-38557) What may be a cause for HDFSMetadataCommitter: Error while fetching MetaData and how to fix or work around this?
[ https://issues.apache.org/jira/browse/SPARK-38557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17507263#comment-17507263 ] Dmitry Goldenberg commented on SPARK-38557: --- Likely a DUP of https://github.com/qubole/kinesis-sql/issues/57. > What may be a cause for HDFSMetadataCommitter: Error while fetching MetaData > and how to fix or work around this? > > > Key: SPARK-38557 > URL: https://issues.apache.org/jira/browse/SPARK-38557 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 3.1.1 > Environment: Spark 3.1.1 > AWS EMR 6.3.0 > python 3.7.2 >Reporter: Dmitry Goldenberg >Priority: Major > > I'm seeing errors such as the below when executing structured Spark Streaming > app which streams data from AWS Kinesis. > > I've googled the error but can't tell what may be the cause. Is Spark running > out of disk space? something else? > {code:java} > // From the stderr log in EMR > 22/03/15 00:54:00 WARN HDFSMetadataCommitter: Error while fetching MetaData > [attempt = 1] > java.lang.IllegalStateException: > hdfs://ip-10-2-XXX-XXX.awsinternal.acme.com:8020/mnt/tmp/temporary-03b8fecf-32d5-422c-9375-4c3450ed0bb8/sources/0/shard-commit/0 > does not exist > at > org.apache.spark.sql.kinesis.HDFSMetadataCommitter.$anonfun$get$1(HDFSMetadataCommitter.scala:163) > at > org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) > at > org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:151) > at > org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) > at > org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$6(MicroBatchExecution.scala:399) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:399) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:382) > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:378) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:211) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244){code} > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38557) What may be a cause for HDFSMetadataCommitter: Error while fetching MetaData and how to fix or work around this?
Dmitry Goldenberg created SPARK-38557: - Summary: What may be a cause for HDFSMetadataCommitter: Error while fetching MetaData and how to fix or work around this? Key: SPARK-38557 URL: https://issues.apache.org/jira/browse/SPARK-38557 Project: Spark Issue Type: Question Components: Structured Streaming Affects Versions: 3.1.1 Environment: Spark 3.1.1 AWS EMR 6.3.0 python 3.7.2 Reporter: Dmitry Goldenberg I'm seeing errors such as the below when executing structured Spark Streaming app which streams data from AWS Kinesis. I've googled the error but can't tell what may be the cause. Is Spark running out of disk space? something else? {code:java} // From the stderr log in EMR 22/03/15 00:54:00 WARN HDFSMetadataCommitter: Error while fetching MetaData [attempt = 1] java.lang.IllegalStateException: hdfs://ip-10-2-XXX-XXX.awsinternal.acme.com:8020/mnt/tmp/temporary-03b8fecf-32d5-422c-9375-4c3450ed0bb8/sources/0/shard-commit/0 does not exist at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.$anonfun$get$1(HDFSMetadataCommitter.scala:163) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.withRetry(HDFSMetadataCommitter.scala:229) at org.apache.spark.sql.kinesis.HDFSMetadataCommitter.get(HDFSMetadataCommitter.scala:151) at org.apache.spark.sql.kinesis.KinesisSource.prevBatchShardInfo(KinesisSource.scala:275) at org.apache.spark.sql.kinesis.KinesisSource.getOffset(KinesisSource.scala:163) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$6(MicroBatchExecution.scala:399) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:399) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:382) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:378) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:211) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244){code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36958) Reading of legacy timestamps from Parquet confusing in Spark 3, related config values don't seem working
[ https://issues.apache.org/jira/browse/SPARK-36958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426571#comment-17426571 ] Dmitry Goldenberg commented on SPARK-36958: --- [~hyukjin.kwon] Hi, so which setting out of the below would I need to set to what value, and why? - spark.sql.legacy.parquet.datetimeRebaseModeInRead - spark.sql.legacy.parquet.int96RebaseModeInRead - spark.sql.legacy.parquet.int96RebaseModeInWrite - spark.sql.legacy.timeParserPolicy I'll see about a reproducer though it may take a little while. The first question is, what are these settings and how do they work? Second, why is there an error about "reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z"? We don't have date values like that in the system. > doesn't it work if you set it to LEGACY Again, which values? And why LEGACY and not CORRECTED? > Reading of legacy timestamps from Parquet confusing in Spark 3, related > config values don't seem working > > > Key: SPARK-36958 > URL: https://issues.apache.org/jira/browse/SPARK-36958 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.1.2 > Environment: emr-6.4.0 > spark 3.1.2 >Reporter: Dmitry Goldenberg >Priority: Major > > I'm having a major issue with trying to run in Spark 3, reading parquet data > that got generated with Spark 2.4. > The full stack trace is below. > The error message is very confusing: > # I do not have dates that before 1582-10-15 or timestamps before > 1900-01-01T00:00:00Z > # The documentation does not state clearly how to work around/fix this > issue. What exactly is the difference between the LEGACY and CORRECTED values > of the config settings? > # Which of the following would I want to set and to what values? - > spark.sql.legacy.parquet.datetimeRebaseModeInWrite > - spark.sql.legacy.parquet.datetimeRebaseModeInRead > - spark.sql.legacy.parquet.int96RebaseModeInRead > - spark.sql.legacy.parquet.int96RebaseModeInWrite > - spark.sql.legacy.timeParserPolicy > # I've tried setting these to CORRECTED,CORRECTED,CORRECTED,CORRECTED, and > LEGACY, respectively, and got the same error (see the stack trace). > The issues that I see with this: > # Lack of thorough clear documentation on what this is and how it's meant to > work. > # The confusing error message. > # The fact that the error still occurs even when you set the config values. > > {quote} py4j.protocol.Py4JJavaError: An error occurred while calling > o1134.count.py4j.protocol.Py4JJavaError: An error occurred while calling > o1134.count.: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 8 in stage 36.0 failed 4 times, most recent failure: Lost task > 8.3 in stage 36.0 (TID 619) (ip-10-2-251-59.awsinternal.audiomack.com > executor 2): org.apache.spark.SparkUpgradeException: You may get a different > result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or > timestamps before 1900-01-01T00:00:00Z from Parquet INT96 files can be > ambiguous, as the files may be written by Spark 2.x or legacy versions of > Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s > Proleptic Gregorian calendar. See more details in SPARK-31404. You can set > spark.sql.legacy.parquet.int96RebaseModeInRead to 'LEGACY' to rebase the > datetime values w.r.t. the calendar difference during reading. Or set > spark.sql.legacy.parquet.int96RebaseModeInRead to 'CORRECTED' to read the > datetime values as it is. at > org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInRead(DataSourceUtils.scala:159) > at > org.apache.spark.sql.execution.datasources.DataSourceUtils.newRebaseExceptionInRead(DataSourceUtils.scala) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.rebaseTimestamp(VectorizedColumnReader.java:228) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.rebaseInt96(VectorizedColumnReader.java:242) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:662) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:300) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:295) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:193) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37) > at >
[jira] [Created] (SPARK-36958) Reading of legacy timestamps from Parquet confusing in Spark 3, related config values don't seem working
Dmitry Goldenberg created SPARK-36958: - Summary: Reading of legacy timestamps from Parquet confusing in Spark 3, related config values don't seem working Key: SPARK-36958 URL: https://issues.apache.org/jira/browse/SPARK-36958 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.1.2 Environment: emr-6.4.0 spark 3.1.2 Reporter: Dmitry Goldenberg I'm having a major issue with trying to run in Spark 3, reading parquet data that got generated with Spark 2.4. The full stack trace is below. The error message is very confusing: # I do not have dates that before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z # The documentation does not state clearly how to work around/fix this issue. What exactly is the difference between the LEGACY and CORRECTED values of the config settings? # Which of the following would I want to set and to what values? - spark.sql.legacy.parquet.datetimeRebaseModeInWrite - spark.sql.legacy.parquet.datetimeRebaseModeInRead - spark.sql.legacy.parquet.int96RebaseModeInRead - spark.sql.legacy.parquet.int96RebaseModeInWrite - spark.sql.legacy.timeParserPolicy # I've tried setting these to CORRECTED,CORRECTED,CORRECTED,CORRECTED, and LEGACY, respectively, and got the same error (see the stack trace). The issues that I see with this: # Lack of thorough clear documentation on what this is and how it's meant to work. # The confusing error message. # The fact that the error still occurs even when you set the config values. {quote} py4j.protocol.Py4JJavaError: An error occurred while calling o1134.count.py4j.protocol.Py4JJavaError: An error occurred while calling o1134.count.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 36.0 failed 4 times, most recent failure: Lost task 8.3 in stage 36.0 (TID 619) (ip-10-2-251-59.awsinternal.audiomack.com executor 2): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet INT96 files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.int96RebaseModeInRead to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during reading. Or set spark.sql.legacy.parquet.int96RebaseModeInRead to 'CORRECTED' to read the datetime values as it is. at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInRead(DataSourceUtils.scala:159) at org.apache.spark.sql.execution.datasources.DataSourceUtils.newRebaseExceptionInRead(DataSourceUtils.scala) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.rebaseTimestamp(VectorizedColumnReader.java:228) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.rebaseInt96(VectorizedColumnReader.java:242) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:662) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:300) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:295) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:193) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:159) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:614) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:832) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at
[jira] [Comment Edited] (SPARK-8665) Update ALS documentation to include performance tips
[ https://issues.apache.org/jira/browse/SPARK-8665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044607#comment-17044607 ] Dmitry Goldenberg edited comment on SPARK-8665 at 2/25/20 4:04 PM: --- Any reason why this is marked as resolved? performance tips would definitely be helpful esp. when dealing with larger datasets, esp. when it comes to memory management. Also blockSize. See for example SPARK-20443 was (Author: dgoldenberg): Any reason why this is marked as resolved? performance tips would definitely be helpful esp. when dealing with larger datasets, esp. when it comes to memory management. > Update ALS documentation to include performance tips > > > Key: SPARK-8665 > URL: https://issues.apache.org/jira/browse/SPARK-8665 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 1.4.0 >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Major > Labels: bulk-closed > Original Estimate: 1h > Remaining Estimate: 1h > > With the new ALS implementation, users still need to deal with > computation/communication trade-offs. It would be nice to document this > clearly based on the issues on the mailing list. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8665) Update ALS documentation to include performance tips
[ https://issues.apache.org/jira/browse/SPARK-8665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044607#comment-17044607 ] Dmitry Goldenberg commented on SPARK-8665: -- Any reason why this is marked as resolved? performance tips would definitely be helpful esp. when dealing with larger datasets, esp. when it comes to memory management. > Update ALS documentation to include performance tips > > > Key: SPARK-8665 > URL: https://issues.apache.org/jira/browse/SPARK-8665 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 1.4.0 >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Major > Labels: bulk-closed > Original Estimate: 1h > Remaining Estimate: 1h > > With the new ALS implementation, users still need to deal with > computation/communication trade-offs. It would be nice to document this > clearly based on the issues on the mailing list. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'
[ https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg resolved SPARK-27567. --- Resolution: Not A Bug In our case, issue was caused by someone changing the IP address of the box. Ideally, Kafka or Spark could have more "telling", more intuitive error messages for these types of issues. > Spark Streaming consumers (from Kafka) intermittently die with > 'SparkException: Couldn't find leaders for Set' > -- > > Key: SPARK-27567 > URL: https://issues.apache.org/jira/browse/SPARK-27567 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.5.0 > Environment: GCP / 170~14.04.1-Ubuntu >Reporter: Dmitry Goldenberg >Priority: Major > > Some of our consumers intermittently die with the stack traces I'm including. > Once restarted they run for a while then die again. > I can't find any cohesive documentation on what this error means and how to > go about troubleshooting it. Any help would be appreciated. > *Kafka version* is 0.8.2.1 (2.10-0.8.2.1). > Some of the errors seen look like this: > {noformat} > ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on > 10.150.0.54: remote Rpc client disassociated{noformat} > Main error stack trace: > {noformat} > 2019-04-23 20:36:54,323 ERROR > org.apache.spark.streaming.scheduler.JobScheduler: Error g > enerating jobs for time 1556066214000 ms > org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: > Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], > [hdfs.hbase.acme.attachmen > ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], > [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], > [hdfs.hbase.acme.attachme > nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], > [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], > [hdfs.hbase.acme.attachme > nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], > [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], > [hdfs.hbase.acme.attach > ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], > [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], > [hdfs.hbase.acme.attac > hments,29], [hdfs.hbase.acme.attachments,33], > [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], > [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att > achments,21], [hdfs.hbase.acme.attachments,3], > [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], > [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at > tachments,61])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j > ar:?] > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at scala.Option.orElse(Option.scala:257) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at >
[jira] [Commented] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'
[ https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16855996#comment-16855996 ] Dmitry Goldenberg commented on SPARK-27567: --- OK, it appears that the IP address of the box in question got changed and that sent Kafka (and the consumers along with it) heywire. We can probably close the ticket but this may be useful to other folks. The takeaway is, check your networking settings; any changes to the IP address or the like. Kafka doesn't respond too well to that :) > Spark Streaming consumers (from Kafka) intermittently die with > 'SparkException: Couldn't find leaders for Set' > -- > > Key: SPARK-27567 > URL: https://issues.apache.org/jira/browse/SPARK-27567 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.5.0 > Environment: GCP / 170~14.04.1-Ubuntu >Reporter: Dmitry Goldenberg >Priority: Major > > Some of our consumers intermittently die with the stack traces I'm including. > Once restarted they run for a while then die again. > I can't find any cohesive documentation on what this error means and how to > go about troubleshooting it. Any help would be appreciated. > *Kafka version* is 0.8.2.1 (2.10-0.8.2.1). > Some of the errors seen look like this: > {noformat} > ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on > 10.150.0.54: remote Rpc client disassociated{noformat} > Main error stack trace: > {noformat} > 2019-04-23 20:36:54,323 ERROR > org.apache.spark.streaming.scheduler.JobScheduler: Error g > enerating jobs for time 1556066214000 ms > org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: > Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], > [hdfs.hbase.acme.attachmen > ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], > [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], > [hdfs.hbase.acme.attachme > nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], > [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], > [hdfs.hbase.acme.attachme > nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], > [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], > [hdfs.hbase.acme.attach > ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], > [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], > [hdfs.hbase.acme.attac > hments,29], [hdfs.hbase.acme.attachments,33], > [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], > [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att > achments,21], [hdfs.hbase.acme.attachments,3], > [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], > [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at > tachments,61])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j > ar:?] > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at scala.Option.orElse(Option.scala:257) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at >
[jira] [Reopened] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'
[ https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg reopened SPARK-27567: --- Issue not resolved, it appears intermittently. We have a QA box with Kafka installed and our Spark Streaming job pulling from it. Kafka has replication factor set to 1 since it's a cluster of 1 node. Just saw the error there. > Spark Streaming consumers (from Kafka) intermittently die with > 'SparkException: Couldn't find leaders for Set' > -- > > Key: SPARK-27567 > URL: https://issues.apache.org/jira/browse/SPARK-27567 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.5.0 > Environment: GCP / 170~14.04.1-Ubuntu >Reporter: Dmitry Goldenberg >Priority: Major > > Some of our consumers intermittently die with the stack traces I'm including. > Once restarted they run for a while then die again. > I can't find any cohesive documentation on what this error means and how to > go about troubleshooting it. Any help would be appreciated. > *Kafka version* is 0.8.2.1 (2.10-0.8.2.1). > Some of the errors seen look like this: > {noformat} > ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on > 10.150.0.54: remote Rpc client disassociated{noformat} > Main error stack trace: > {noformat} > 2019-04-23 20:36:54,323 ERROR > org.apache.spark.streaming.scheduler.JobScheduler: Error g > enerating jobs for time 1556066214000 ms > org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: > Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], > [hdfs.hbase.acme.attachmen > ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], > [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], > [hdfs.hbase.acme.attachme > nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], > [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], > [hdfs.hbase.acme.attachme > nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], > [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], > [hdfs.hbase.acme.attach > ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], > [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], > [hdfs.hbase.acme.attac > hments,29], [hdfs.hbase.acme.attachments,33], > [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], > [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att > achments,21], [hdfs.hbase.acme.attachments,3], > [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], > [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at > tachments,61])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j > ar:?] > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at scala.Option.orElse(Option.scala:257) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at >
[jira] [Commented] (SPARK-8526) Provide a way to define custom metrics and custom metric sink in Spark
[ https://issues.apache.org/jira/browse/SPARK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844761#comment-16844761 ] Dmitry Goldenberg commented on SPARK-8526: -- Any reason why this is marked as 'resolved'? What's the resolution? If it's incomplete, what's missing? > Provide a way to define custom metrics and custom metric sink in Spark > -- > > Key: SPARK-8526 > URL: https://issues.apache.org/jira/browse/SPARK-8526 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.4.0 >Reporter: Dmitry Goldenberg >Priority: Major > Labels: bulk-closed > > There is a number of user questions posted into the Spark mailing list > regarding how to instrument custom metrics within Spark applications: > * how to define custom metrics > * how to define a custom metrics sink such as a custom metrics servlet > * how to call back into the consumers from the sink to report on some actions > potentially taken by the metrics sink > Some of this may be a documentation issue. Currently the Metrics section > under https://spark.apache.org/docs/latest/monitoring.html is rather sparse. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'
[ https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829555#comment-16829555 ] Dmitry Goldenberg commented on SPARK-27567: --- Hi Liang-Chi, So you must be referring to this comment from that post, I gather: {noformat} This is expected behaviour. You have requested that each topic be stored on one machine by setting ReplicationFactor to one. When the one machine that happens to store the topic normalized-tenant4 is taken down, the consumer cannot find the leader of the topic. See http://kafka.apache.org/documentation.html#intro_guarantees.{noformat} I believe that indeed we have replication factor set to 1 for Kafka in this particular cluster I'm looking at. Could it possibly be that the node that happens to hold particular segments of Kafka data becomes intermittently inaccessible (e.g. due to a network hiccup) and then the consumer is not able to retrieve the data from Kafka? And then the issue is really one of Kafka configuration? It sounds like if we increase the replication factor on the Kafka side the issue on the Spark side may go away. Agree / disagree? In any case, the error "Couldn't find leaders for Set" is not very direct in terms of what the error causes may be. Perhaps this can be improved in Spark Streaming. At the very least, this Jira incident may help others. I'll look into increasing the replication factor on the Kafka side and update this ticket. > Spark Streaming consumers (from Kafka) intermittently die with > 'SparkException: Couldn't find leaders for Set' > -- > > Key: SPARK-27567 > URL: https://issues.apache.org/jira/browse/SPARK-27567 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.5.0 > Environment: GCP / 170~14.04.1-Ubuntu >Reporter: Dmitry Goldenberg >Priority: Major > > Some of our consumers intermittently die with the stack traces I'm including. > Once restarted they run for a while then die again. > I can't find any cohesive documentation on what this error means and how to > go about troubleshooting it. Any help would be appreciated. > *Kafka version* is 0.8.2.1 (2.10-0.8.2.1). > Some of the errors seen look like this: > {noformat} > ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on > 10.150.0.54: remote Rpc client disassociated{noformat} > Main error stack trace: > {noformat} > 2019-04-23 20:36:54,323 ERROR > org.apache.spark.streaming.scheduler.JobScheduler: Error g > enerating jobs for time 1556066214000 ms > org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: > Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], > [hdfs.hbase.acme.attachmen > ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], > [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], > [hdfs.hbase.acme.attachme > nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], > [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], > [hdfs.hbase.acme.attachme > nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], > [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], > [hdfs.hbase.acme.attach > ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], > [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], > [hdfs.hbase.acme.attac > hments,29], [hdfs.hbase.acme.attachments,33], > [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], > [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att > achments,21], [hdfs.hbase.acme.attachments,3], > [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], > [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at > tachments,61])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j > ar:?] > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > ~[spark-assembly-1.5.0-hadoop2.4.0.ja > r:1.5.0] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >
[jira] [Updated] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'
[ https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-27567: -- Description: Some of our consumers intermittently die with the stack traces I'm including. Once restarted they run for a while then die again. I can't find any cohesive documentation on what this error means and how to go about troubleshooting it. Any help would be appreciated. *Kafka version* is 0.8.2.1 (2.10-0.8.2.1). Some of the errors seen look like this: {noformat} ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on 10.150.0.54: remote Rpc client disassociated{noformat} Main error stack trace: {noformat} 2019-04-23 20:36:54,323 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error g enerating jobs for time 1556066214000 ms org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], [hdfs.hbase.acme.attachmen ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], [hdfs.hbase.acme.attachme nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], [hdfs.hbase.acme.attachme nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], [hdfs.hbase.acme.attach ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], [hdfs.hbase.acme.attac hments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att achments,21], [hdfs.hbase.acme.attachments,3], [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at tachments,61])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j ar:?] at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at scala.Option.orElse(Option.scala:257) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at
[jira] [Created] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'
Dmitry Goldenberg created SPARK-27567: - Summary: Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set' Key: SPARK-27567 URL: https://issues.apache.org/jira/browse/SPARK-27567 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 1.5.0 Environment: GCP / 170~14.04.1-Ubuntu Reporter: Dmitry Goldenberg Some of our consumers intermittently die with the stack traces I'm including. Once restarted they run for a while then die again. I can't find any cohesive documentation on what this error means and how to go about troubleshooting it. Any help would be appreciated. Some of the errors seen look like this: ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on 10.150.0.54: remote Rpc client disassociated Main error stack trace: {noformat} 2019-04-23 20:36:54,323 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error g enerating jobs for time 1556066214000 ms org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], [hdfs.hbase.acme.attachmen ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], [hdfs.hbase.acme.attachme nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], [hdfs.hbase.acme.attachme nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], [hdfs.hbase.acme.attach ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], [hdfs.hbase.acme.attac hments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att achments,21], [hdfs.hbase.acme.attachments,3], [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at tachments,61])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j ar:?] at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at scala.Option.orElse(Option.scala:257) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja r:1.5.0] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
[jira] [Commented] (SPARK-27529) Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16826072#comment-16826072 ] Dmitry Goldenberg commented on SPARK-27529: --- Hi Hyukjin, I could check although it seems this behavior has been there a while and in all likelihood has not changed. A check with a higher version will entail some effort because a lot has changed in Kafka and in Spark. I was hoping to get a more precise answer from someone who wrote the code in Spark Streaming or is familiar on the detailed level. Is this a bug or is this a behavior? If it's an expected behavior, what's causing it and how could we work around it? > Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException > - > > Key: SPARK-27529 > URL: https://issues.apache.org/jira/browse/SPARK-27529 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.5.0 >Reporter: Dmitry Goldenberg >Priority: Major > > We have a Spark Streaming consumer which at a certain point started > consistently failing upon a restart with the below error. > Some details: > * Spark version is 1.5.0. > * Kafka version is 0.8.2.1 (2.10-0.8.2.1). > * The topic is configured with: retention.ms=1471228928, > max.message.bytes=1. > * The consumer runs with auto.offset.reset=smallest. > * No checkpointing is currently enabled. > I don't see anything in the Spark or Kafka doc to understand why this is > happening. From googling around, > {noformat} > https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ > Finally, I’ll repeat that any semantics beyond at-most-once require that you > have sufficient log retention in Kafka. If you’re seeing things like > OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka > storage, not because something’s wrong with Spark or Kafka.{noformat} > Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible > causes. > {noformat} > You've under-provisioned Kafka storage and / or Spark compute capacity. > The result is that data is being deleted before it has been > processed.{noformat} > All we're trying to do is start the consumer and consume from the topic from > the earliest available offset. Why would we not be able to do that? How can > the offsets be out of range if we're saying, just read from the earliest > available? > Since we have the retention.ms set to 1 year and we created the topic just a > few weeks ago, I'd not expect any deletion being done by Kafka as we're > consuming. > I'd like to understand the actual cause of this error. Any recommendations on > a workaround would be appreciated. > Stack traces: > {noformat} > 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler > .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job > 2019-04-19 11:35:17,160 ERROR > org.apache.spark.streaming.scheduler.JobScheduler: Error running job > streaming job 1555682554000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in > stage 147.0 failed 4 times, most recent failure: Lost task > 10.3 in stage 147.0 (TID 2368, 10.150.0.58): > kafka.common.OffsetOutOfRangeException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at java.lang.Class.newInstance(Class.java:442) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) > at > com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) > at > com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) > at >
[jira] [Updated] (SPARK-27529) Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-27529: -- Description: We have a Spark Streaming consumer which at a certain point started consistently failing upon a restart with the below error. Some details: * Spark version is 1.5.0. * Kafka version is 0.8.2.1 (2.10-0.8.2.1). * The topic is configured with: retention.ms=1471228928, max.message.bytes=1. * The consumer runs with auto.offset.reset=smallest. * No checkpointing is currently enabled. I don't see anything in the Spark or Kafka doc to understand why this is happening. From googling around, {noformat} https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.{noformat} Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible causes. {noformat} You've under-provisioned Kafka storage and / or Spark compute capacity. The result is that data is being deleted before it has been processed.{noformat} All we're trying to do is start the consumer and consume from the topic from the earliest available offset. Why would we not be able to do that? How can the offsets be out of range if we're saying, just read from the earliest available? Since we have the retention.ms set to 1 year and we created the topic just a few weeks ago, I'd not expect any deletion being done by Kafka as we're consuming. I'd like to understand the actual cause of this error. Any recommendations on a workaround would be appreciated. Stack traces: {noformat} 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job 2019-04-19 11:35:17,160 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1555682554000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 147.0 failed 4 times, most recent failure: Lost task 10.3 in stage 147.0 (TID 2368, 10.150.0.58): kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) ~[spark-assembly-1.5.0-hadoop2.4 .0.jar:1.5.0] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
[jira] [Updated] (SPARK-27529) Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-27529: -- Description: We have a Spark Streaming consumer which at a certain point started consistently failing upon a restart with the below error. Some details: * Spark version is 1.5.0. * Kafka version is 0.8.2.1 (2.10-0.8.2.1). * The topic is configured with: retention.ms=1471228928, max.message.bytes=1. * The consumer runs with auto.offset.reset=smallest. * No checkpointing is currently enabled. I don't see anything in the Spark or Kafka doc to understand why this is happening. From googling around, {noformat} https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.{noformat} Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible causes. {noformat} You've under-provisioned Kafka storage and / or Spark compute capacity. The result is that data is being deleted before it has been processed.{noformat} All we're trying to do is start the consumer and consume from the topic from the earliest available offset. Why would we not be able to do that? How can the offsets be out of range if we're saying, just read from the earliest available? Since we have the retention.ms set to 1 year and we created the topic just a few weeks ago, I'd not expect any deletion being done by Kafka as we're consuming. The behavior we're seeing on the consumer side does not feel intuitive or cohesive to me. If it is, I'd like to understand the actual cause. Any recommendations on a workaround would be appreciated. Stack traces: {noformat} 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job 2019-04-19 11:35:17,160 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1555682554000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 147.0 failed 4 times, most recent failure: Lost task 10.3 in stage 147.0 (TID 2368, 10.150.0.58): kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) ~[spark-assembly-1.5.0-hadoop2.4 .0.jar:1.5.0] at
[jira] [Updated] (SPARK-27529) Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-27529: -- Description: We have a Spark Streaming consumer which at a certain point started consistently failing upon a restart with the below error. Some details: * Spark version is 1.5.0. * Kafka version is 0.8.2.1 (2.10-0.8.2.1). * The topic is configured with: retention.ms=1471228928, max.message.bytes=1. * The consumer runs with auto.offset.reset=smallest. * No checkpointing is currently enabled. I don't see anything in the Spark or Kafka doc to understand why this is happening. From googling around, {noformat} https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.{noformat} Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible causes. {noformat} You've under-provisioned Kafka storage and / or Spark compute capacity. The result is that data is being deleted before it has been processed.{noformat} All we're trying to do is start the consumer and consume from the topic from the earliest available offset. Why would we not be able to do that? How can the offsets be out of range if we're saying, just read from the earliest available? Since we have the retention.ms set to 1 year and we created the topic just a few weeks ago, I'd not expect any deletion being done by Kafka as we're consuming. The behavior we're seeing on the consumer side does not feel intuitive or cohesive to me. If it is, I'd like to know how to work around it. Stack traces: {noformat} 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job 2019-04-19 11:35:17,160 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1555682554000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 147.0 failed 4 times, most recent failure: Lost task 10.3 in stage 147.0 (TID 2368, 10.150.0.58): kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) ~[spark-assembly-1.5.0-hadoop2.4 .0.jar:1.5.0] at
[jira] [Created] (SPARK-27529) Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
Dmitry Goldenberg created SPARK-27529: - Summary: Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException Key: SPARK-27529 URL: https://issues.apache.org/jira/browse/SPARK-27529 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 1.5.0 Reporter: Dmitry Goldenberg We have a Spark Streaming consumer which at a certain point started consistently failing upon a restart with the below error. Some details: * Spark version is 1.5.0. * Kafka version is 0.8.2.1 (2.10-0.8.2.1). * The topic is configured with: retention.ms=1471228928, max.message.bytes=1. * The consumer runs with auto.offset.reset=smallest. * No checkpointing is currently enabled. I don't see anything in the Spark or Kafka doc to understand why this is happening. From googling around, {noformat} https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.{noformat} Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible causes. {noformat} You've under-provisioned Kafka storage and / or Spark compute capacity. The result is that data is being deleted before it has been processed.{noformat} All we're trying to do is start the consumer and consume from the topic from the earliest available offset. Why would we not be able to do that? How can the offsets be out of range if we're saying, just read from the earliest available? Since we have the retention.ms set to 1 year and we created the topic just a few weeks ago, I'd not expect any deletion being done by Kafka as we're consuming. The behavior we're seeing on the consumer side does not feel intuitive or cohesive to me. If it is, I'd like to know how to work around it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14646419#comment-14646419 ] Dmitry Goldenberg commented on SPARK-9434: -- Thanks Cody and Sean. I'll take a look at the process of submitting changes. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg resolved SPARK-9434. -- Resolution: Not A Problem Documentation on checkpointing is available in the Spark doc set. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
Dmitry Goldenberg created SPARK-9434: Summary: Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645923#comment-14645923 ] Dmitry Goldenberg commented on SPARK-9434: -- Sean, I specifically did not want to add my comments there because that ticket is marked as Won't fix. And now you seem to be resolving this as a DUP, which IMHO is too quick a resolution without a discussion. Do you want to manage Kafka streaming using Kafka offsets? The answer is, it depends. I first need to know if that is what I will have to do. Please re-read what I stated: On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). If Direct streaming already does the progress persistence for us, I'm all for it and problem solved. I will need to know, if that is the case, how to enable this behavior, because I am not seeing it in my testing. However, if, to achieve the effect of resuming from where the consumer is left off, if I need to manually manage the offsets then yes, I want to file an enhancement request which would make this option explicit in the API rather than us having to implement it. In the meantime, a fuller sample for persisting and retrieving offsets with OffsetCommitRequest, OffsetFetchRequest would also be helpful. Right now, the existing sample in the Kafka streaming doc doesn't include a fetch example and the update offsets example doesn't fully demonstrate the update logic. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg reopened SPARK-9434: -- Not resolved. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645971#comment-14645971 ] Dmitry Goldenberg commented on SPARK-9434: -- [~tdas]: Could you weigh in on this discussion? In our previous exchanges, we talked about If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down) Is it indeed the case that the *last read offsets are automatically recovered?? I am not seeing that. Our consumers resume from the first data that's added to Kafka *after the consumers have been restarted, leading to the loss of data added *while the consumers were down*. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645971#comment-14645971 ] Dmitry Goldenberg edited comment on SPARK-9434 at 7/29/15 12:30 PM: [~tdas]: Could you weigh in on this discussion? In our previous exchanges, we talked about If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down) Is it indeed the case that the *last read offsets are automatically recovered*?? I am not seeing that. Our consumers resume from the first data that's added to Kafka *after the consumers have been restarted*, leading to the loss of data added *while the consumers were down*. was (Author: dgoldenberg): [~tdas]: Could you weigh in on this discussion? In our previous exchanges, we talked about If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down) Is it indeed the case that the *last read offsets are automatically recovered*?? I am not seeing that. Our consumers resume from the first data that's added to Kafka *after the consumers have been restarted, leading to the loss of data added *while the consumers were down*. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645992#comment-14645992 ] Dmitry Goldenberg commented on SPARK-9434: -- @Sean Owen: Per your point about placing this discussion into the mailing list, I [had placed my question there|http://apache-spark-user-list.1001560.n3.nabble.com/What-happens-when-a-streaming-consumer-job-is-killed-then-restarted-tt23348.html] and it has remained unanswered there since June 16th. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645992#comment-14645992 ] Dmitry Goldenberg edited comment on SPARK-9434 at 7/29/15 12:50 PM: [~sowen] Per your point about placing this discussion into the mailing list, I [had placed my question there|http://apache-spark-user-list.1001560.n3.nabble.com/What-happens-when-a-streaming-consumer-job-is-killed-then-restarted-tt23348.html] and it has remained unanswered there since June 16th. was (Author: dgoldenberg): @Sean Owen: Per your point about placing this discussion into the mailing list, I [had placed my question there|http://apache-spark-user-list.1001560.n3.nabble.com/What-happens-when-a-streaming-consumer-job-is-killed-then-restarted-tt23348.html] and it has remained unanswered there since June 16th. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14646021#comment-14646021 ] Dmitry Goldenberg commented on SPARK-9434: -- Great, thanks, Sean. Perhaps Cody and/or Tathagata can weigh in with the details of how checkpointing would be enabled for direct streaming. I agree that if that is the case, this would deserve a clarification in the Kafka integration guide, preferably with a code sample. A fuller example of manual ZK offset update would be great, whether we have to go the else route you mentioned or not. We may need to eventually do that anyway in order to enable direct streaming monitoring, which relies on ZK offsets. For now though all I'm trying to do is get the checkpoint-based progress persistence to work, so a clear example would be great. By the way, how does the checkpointing persistence work? Where on a worker node would I be able to find this checkpointing datastore and is it easily browsable? Thanks. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14646033#comment-14646033 ] Dmitry Goldenberg commented on SPARK-9434: -- Ah. I think this is making sense then :) http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14646045#comment-14646045 ] Dmitry Goldenberg commented on SPARK-9434: -- Perhaps the Kafka user guide could reference this part of the doc? it'd make it a lot easier to make the connection... Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14646047#comment-14646047 ] Dmitry Goldenberg commented on SPARK-9434: -- Yup, Cody, just connected the dots with the checkpointing doc. The Kafka and checkpointing docs seem disjoint so it's not trivial to make the connection from the get-go. Thanks. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645965#comment-14645965 ] Dmitry Goldenberg commented on SPARK-9434: -- [~sowen]: Putting this here was the only way to start getting true attention and more definitive answers. We *are* using Approach 2, the direct streaming. As a slight side note, there are no receivers there so the term direct receivers that you seem to be using doesn't apply. That said, the doc states: Offsets are tracked by Spark Streaming within its checkpoints. Can you confirm that this storage of offsets is *only* done for the _smallest_ and _greatest_ values of auto.reset.offset ? This really appears to be the case. And if that is the case, then I'll need to use manual update of offsets so that we can manually ascertain where to resume processing. If that is the case, I'm filing for an enhancement request for this being an option in the API. From the sample perspective, at least the ... in the below needs to be filled in: {code} foreachRDD( new FunctionJavaPairRDDString, String, Void() { @Override public Void call(JavaPairRDDString, String rdd) throws IOException { for (OffsetRange o : offsetRanges.get()) { System.out.println( o.topic() + + o.partition() + + o.fromOffset() + + o.untilOffset() ); } ... return null; } } ); {code} Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9434) Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API
[ https://issues.apache.org/jira/browse/SPARK-9434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645971#comment-14645971 ] Dmitry Goldenberg edited comment on SPARK-9434 at 7/29/15 12:30 PM: [~tdas]: Could you weigh in on this discussion? In our previous exchanges, we talked about If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down) Is it indeed the case that the *last read offsets are automatically recovered*?? I am not seeing that. Our consumers resume from the first data that's added to Kafka *after the consumers have been restarted, leading to the loss of data added *while the consumers were down*. was (Author: dgoldenberg): [~tdas]: Could you weigh in on this discussion? In our previous exchanges, we talked about If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down) Is it indeed the case that the *last read offsets are automatically recovered?? I am not seeing that. Our consumers resume from the first data that's added to Kafka *after the consumers have been restarted, leading to the loss of data added *while the consumers were down*. Need how-to for resuming direct Kafka streaming consumers where they had left off before getting terminated, OR actual support for that mode in the Streaming API - Key: SPARK-9434 URL: https://issues.apache.org/jira/browse/SPARK-9434 Project: Spark Issue Type: Improvement Components: Documentation, Examples, Streaming Affects Versions: 1.4.1 Reporter: Dmitry Goldenberg We've been getting some mixed information regarding how to cause our direct streaming consumers to resume processing from where they left off in terms of the Kafka offsets. On the one hand side, we're hearing If you are restarting the streaming app with Direct kafka from the checkpoint information (that is, restarting), then the last read offsets are automatically recovered, and the data will start processing from that offset. All the N records added in T will stay buffered in Kafka. (where T is the interval of time during which the consumer was down). On the other hand, there are tickets such as SPARK-6249 and SPARK-8833 which are marked as won't fix which seem to ask for the functionality we need, with comments like I don't want to add more config options with confusing semantics around what is being used for the system of record for offsets, I'd rather make it easy for people to explicitly do what they need. The use-case is actually very clear and doesn't ask for confusing semantics. An API option to resume reading where you left off, in addition to the smallest or greatest auto.offset.reset should be *very* useful, probably for quite a few folks. We're asking for this as an enhancement request. SPARK-8833 states I am waiting for getting enough usecase to float in before I take a final call. We're adding to that. In the meantime, can you clarify the confusion? Does direct streaming persist the progress information into DStream checkpoints or does it not? If it does, why is it that we're not seeing that happen? Our consumers start with auto.offset.reset=greatest and that causes them to read from the first offset of data that is written to Kafka *after* the consumer has been restarted, meaning we're missing data that had come in while the consumer was down. If the progress is stored in DStream checkpoints, we want to know a) how to cause that to work for us and b) where the said checkpointing data is stored physically. Conversely, if this is not accurate, then is our only choice to manually persist the offsets into Zookeeper? If that is the case then a) we'd like a clear, more complete code sample to be published, since the one in the Kafka streaming guide is incomplete (it lacks the actual lines of code persisting the offsets) and b) we'd like to request that SPARK-8833 be revisited as a feature worth implementing in the API. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To
[jira] [Created] (SPARK-8526) Provide a way to define custom metrics and custom metric sink in Spark
Dmitry Goldenberg created SPARK-8526: Summary: Provide a way to define custom metrics and custom metric sink in Spark Key: SPARK-8526 URL: https://issues.apache.org/jira/browse/SPARK-8526 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.4.0 Reporter: Dmitry Goldenberg There is a number of user questions posted into the Spark mailing list regarding how to instrument custom metrics within Spark applications: * how to define custom metrics * how to define a custom metrics sink such as a custom metrics servlet * how to call back into the consumers from the sink to report on some actions potentially taken by the metrics sink Some of this may be a documentation issue. Currently the Metrics section under https://spark.apache.org/docs/latest/monitoring.html is rather sparse. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14516241#comment-14516241 ] Dmitry Goldenberg commented on SPARK-7154: -- I've added this info to the 'mirror' ticket in Phoenix' JIRA I filed [PHOENIX-1926|https://issues.apache.org/jira/browse/PHOENIX-1926]: It looks like I got this to work, with the following in my spark-submit invocation: {code} ./bin/spark-submit \ --driver-class-path $HBASE_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar \ --driver-java-options -Dspark.driver.extraClassPath=$HBASE_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar \ --class com.myco.Driver \ --master local[*] \ /mnt/data/myco.jar \ {code} The crucial part to getting this to work was the first parameter, --driver-class-path. Things work with just that and without spark.driver.extraClassPath. Things do not work with spark.driver.extraClassPath but no --driver-class-path, and of course they don't work with both of these missing in the invocation. I also have Phoenix' dependency hbase and hadoop jars' classes rolled into my driver jar. Tested with them in the job jar and without, the error goes away in either case if the protocol jar is on the driver-class-path. Since I've only tried this in the local mode, it's not yet clear to me whether spark.driver.extraClassPath and/or rolling the Phoenix' dependency hbase and hadoop jars into the job jar would be required. But the good news is that there's clearly a 'mojo' for getting this to work so one gets no IllegalAccessError anymore. Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at
[jira] [Commented] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14516242#comment-14516242 ] Dmitry Goldenberg commented on SPARK-7154: -- Could this possibly be documented in the Spark doc set somewhere as part of resolving this ticket? Thanks. Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:83) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:25) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at
[jira] [Commented] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14514303#comment-14514303 ] Dmitry Goldenberg commented on SPARK-7154: -- Thanks, Sean. What is the right way of setting spark.executor.userClassPathFirst ? If I try and set it on SparkConf programmatically it seems to be ignored. {code} SparkConf sparkConf = new SparkConf().setAppName(appName); sparkConf.set(spark.executor.userClassPathFirst, true); {code} Is --driver-java-options -Dspark.executor.userClassPathFirst=true the recommended approach? I'm now wrapping all the HBase and Hadoop dependency classes of Phoenix into the Spark job jar but I want to make sure they take precedence. Spark's assembly jar has its own protobuf and Hadoop classes so perhaps I'm clashing with those. Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at
[jira] [Commented] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14514314#comment-14514314 ] Dmitry Goldenberg commented on SPARK-7154: -- Trying to use --driver-java-options -Dspark.executor.userClassPathFirst=true is having an affect although not a positive one: {code} 15/04/27 11:36:31 ERROR scheduler.JobScheduler: Error running job streaming job 143014899 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: cannot assign instance of scala.None$ to field org.apache.spark.scheduler.Task.metrics of type scala.Option in instance of org.apache.spark.scheduler.ResultTask at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at
[jira] [Commented] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14513303#comment-14513303 ] Dmitry Goldenberg commented on SPARK-7154: -- Not sure if this may be an HBase issue... as per http://www.cloudera.com/content/cloudera/en/documentation/cloudera-search/v1-latest/Cloudera-Search-Release-Notes/csrn_known_issues_current.html I'm now running with HADOOP_CLASSPATH pointing at my hbase protocol jar: export HADOOP_CLASSPATH=$HBASE_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar and am still getting errors e.g. {code} java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) {code} Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Priority: Critical Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at
[jira] [Comment Edited] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14513217#comment-14513217 ] Dmitry Goldenberg edited comment on SPARK-7154 at 4/26/15 7:37 PM: --- The attached output for building Spark 1.3.1 includes [INFO] Including com.google.protobuf:protobuf-java:jar:2.5.0 in the shaded jar. for building the assembly. Yet the classes in protobuf are different. was (Author: dgoldenberg): This maven output includes [INFO] Including com.google.protobuf:protobuf-java:jar:2.5.0 in the shaded jar. for building the assembly. Yet the classes in protobuf are different. Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at
[jira] [Updated] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-7154: - Attachment: spark-1.3.1-local-build.txt This maven output includes [INFO] Including com.google.protobuf:protobuf-java:jar:2.5.0 in the shaded jar. for building the assembly. Yet the classes in protobuf are different. Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:83) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:25) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at
[jira] [Commented] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14513218#comment-14513218 ] Dmitry Goldenberg commented on SPARK-7154: -- This is really a blocker for me, I can't work around either issue. Any ideas would be appreciated! Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Priority: Critical Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:83) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:25) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at
[jira] [Updated] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-7154: - Priority: Critical (was: Major) Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Priority: Critical Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip, spark-1.3.1-local-build.txt If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:83) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:25) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at
[jira] [Created] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
Dmitry Goldenberg created SPARK-7154: Summary: Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.1 Reporter: Dmitry Goldenberg If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:83) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:25) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
[jira] [Commented] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14513208#comment-14513208 ] Dmitry Goldenberg commented on SPARK-7154: -- As a workaround, I just tried setting the following {code} SparkConf sparkConf = new SparkConf().setAppName(appName); sparkConf.set(spark.executor.userClassPathFirst, true); {code} running in standalone mode, and started getting the following exceptions from Spark while trying to run the jobs with spark-submit: {code} 15/04/26 15:19:00 ERROR scheduler.JobScheduler: Error running job streaming job 143007594 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 9, localhost): java.lang.ClassCastException: cannot assign instance of scala.None$ to field org.apache.spark.scheduler.Task.metrics of type scala.Option in instance of org.apache.spark.scheduler.ResultTask at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} I need a solution for either side of the issue. Either Spark needs to pull in the right protobuf classes or this new exception needs to be worked around, otherwise I'm basically dead in the water. Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at
[jira] [Updated] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-7154: - Attachment: in-spark-1.3.1-local-build.zip in-google-protobuf-2.5.0.zip I'm attaching com.google.protobuf classes which appear to be different in the stock protobuf 2.5.0 jar versus the classes that end up being in the spark assembly jar. In this case, I built Spark 1.3.1 locally. The Shade plugin claims that it's pulling in protobuf 2.5.0 but something is different. Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg Attachments: in-google-protobuf-2.5.0.zip, in-spark-1.3.1-local-build.zip If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:83) at
[jira] [Updated] (SPARK-7154) Spark distro appears to be pulling in incorrect protobuf classes
[ https://issues.apache.org/jira/browse/SPARK-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Goldenberg updated SPARK-7154: - Affects Version/s: (was: 1.3.1) 1.3.0 Spark distro appears to be pulling in incorrect protobuf classes Key: SPARK-7154 URL: https://issues.apache.org/jira/browse/SPARK-7154 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.3.0 Reporter: Dmitry Goldenberg If you download Spark via the site: https://spark.apache.org/downloads.html, for example I chose: http://www.apache.org/dyn/closer.cgi/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.4.tgz then you may see incompatibility with other libraries due to incorrect protobuf classes. I'm seeing such a case in my Spark Streaming job which attempts to use Apache Phoenix to update records in HBase. The job is built with with protobuf 2.5.0 dependency. However, at runtime Spark's classes take precedence in class loading and that is causing exceptions such as the following: java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1620) at org.apache.hadoop.hbase.client.HTable.coprocessorService(HTable.java:1577) at org.apache.phoenix.query.ConnectionQueryServicesImpl.metaDataCoprocessorExec(ConnectionQueryServicesImpl.java:1007) at org.apache.phoenix.query.ConnectionQueryServicesImpl.getTable(ConnectionQueryServicesImpl.java:1257) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:350) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:311) at org.apache.phoenix.schema.MetaDataClient.updateCache(MetaDataClient.java:307) at org.apache.phoenix.compile.FromCompiler$BaseColumnResolver.createTableRef(FromCompiler.java:333) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:237) at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.init(FromCompiler.java:231) at org.apache.phoenix.compile.FromCompiler.getResolverForMutation(FromCompiler.java:207) at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:248) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:503) at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:494) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:295) at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:288) at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53) at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:287) at org.apache.phoenix.jdbc.PhoenixStatement.execute(PhoenixStatement.java:219) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:174) at org.apache.phoenix.jdbc.PhoenixPreparedStatement.execute(PhoenixPreparedStatement.java:179) at com.kona.core.upload.persistence.hdfshbase.HUploadWorkqueueHelper.updateUploadWorkqueueEntry(HUploadWorkqueueHelper.java:139) at com.kona.core.upload.persistence.hdfshbase.HdfsHbaseUploadPersistenceProvider.updateUploadWorkqueueEntry(HdfsHbaseUploadPersistenceProvider.java:144) at com.kona.pipeline.sparkplug.error.UploadEntryErrorHandlerImpl.onError(UploadEntryErrorHandlerImpl.java:62) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processError(KonaPipelineImpl.java:305) at com.kona.pipeline.sparkplug.pipeline.KonaPipelineImpl.processPipelineDocument(KonaPipelineImpl.java:208) at com.kona.pipeline.sparkplug.runner.KonaPipelineRunnerImpl.notifyItemReceived(KonaPipelineRunnerImpl.java:79) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:83) at com.kona.pipeline.streaming.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:25) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)