[jira] [Resolved] (SPARK-33596) NPE when there is no watermark metrics
[ https://issues.apache.org/jira/browse/SPARK-33596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu resolved SPARK-33596. --- Resolution: Won't Fix > NPE when there is no watermark metrics > -- > > Key: SPARK-33596 > URL: https://issues.apache.org/jira/browse/SPARK-33596 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Genmao Yu >Priority: Major > > We parse the process timestamp at > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L153, > but will throw NPE when there is no event time metrics. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33596) NPE when there is no watermark metrics
[ https://issues.apache.org/jira/browse/SPARK-33596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-33596: -- Summary: NPE when there is no watermark metrics (was: NPE when there is no EventTime) > NPE when there is no watermark metrics > -- > > Key: SPARK-33596 > URL: https://issues.apache.org/jira/browse/SPARK-33596 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Genmao Yu >Priority: Major > > We parse the process timestamp at > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L153, > but will throw NPE when there is no event time metrics. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33596) NPE when there is no EventTime
Genmao Yu created SPARK-33596: - Summary: NPE when there is no EventTime Key: SPARK-33596 URL: https://issues.apache.org/jira/browse/SPARK-33596 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.1.0 Reporter: Genmao Yu We parse the process timestamp at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L153, but will throw NPE when there is no event time metrics. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31928) Flaky test: StreamingDeduplicationSuite.test no-data flag
[ https://issues.apache.org/jira/browse/SPARK-31928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130471#comment-17130471 ] Genmao Yu commented on SPARK-31928: --- There is a related pr: [https://github.com/apache/spark/pull/28391] > Flaky test: StreamingDeduplicationSuite.test no-data flag > - > > Key: SPARK-31928 > URL: https://issues.apache.org/jira/browse/SPARK-31928 > Project: Spark > Issue Type: Bug > Components: Structured Streaming, Tests >Affects Versions: 3.1.0 >Reporter: Gabor Somogyi >Priority: Major > > Test failed: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123621/ > {code:java} > [info] with spark.sql.streaming.noDataMicroBatches.enabled = false > [info] Assert on query failed: : > [info] Assert on query failed: > [info] > [info] == Progress == > [info] > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@372edb19,Map(spark.sql.streaming.noDataMicroBatches.enabled > -> false),null) > [info] AddData to MemoryStream[value#437541]: 10,11,12,13,14,15 > [info] CheckAnswer: [10],[11],[12],[13],[14],[15] > [info] AssertOnQuery(, Check total state rows = List(6), > updated state rows = List(6)) > [info] AddData to MemoryStream[value#437541]: 25 > [info] CheckNewAnswer: [25] > [info] AssertOnQuery(, Check total state rows = List(7), > updated state rows = List(1)) > [info] => AssertOnQuery(, ) > [info] > [info] == Stream == > [info] Output Mode: Append > [info] Stream state: {MemoryStream[value#437541]: 1} > [info] Thread state: alive > [info] Thread stack trace: java.lang.Thread.sleep(Native Method) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:241) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1375/882607691.apply$mcZ$sp(Unknown > Source) > [info] > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) > [info] > [info] > [info] == Sink == > [info] 0: [11] [14] [13] [10] [15] [12] > [info] 1: [25] > [info] > [info] > [info] == Plan == > [info] == Parsed Logical Plan == > [info] WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@158ccd13 > [info] +- Project [cast(eventTime#437544-T1ms as bigint) AS > eventTime#437548L] > [info] +- Deduplicate [value#437541, eventTime#437544-T1ms] > [info] +- EventTimeWatermark eventTime#437544: timestamp, 10 seconds > [info]+- Project [value#437541, cast(value#437541 as timestamp) > AS eventTime#437544] > [info] +- StreamingDataSourceV2Relation [value#437541], > org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@1802eea6, > MemoryStream[value#437541], 0, 1 > [info] > [info] == Analyzed Logical Plan == > [info] > [info] WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@158ccd13 > [info] +- Project [cast(eventTime#437544-T1ms as bigint) AS > eventTime#437548L] > [info] +- Deduplicate [value#437541, eventTime#437544-T1ms] > [info] +- EventTimeWatermark eventTime#437544: timestamp, 10 seconds > [info]+- Project [value#437541, cast(value#437541 as timestamp) > AS eventTime#437544] > [info] +- StreamingDataSourceV2Relation [value#437541], > org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@1802eea6, > MemoryStream[value#437541], 0, 1 > [info] > [info] == Optimized Logical Plan == > [info] WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@158ccd13 > [info] +- Project [cast(eventTime#437544-T1ms as bigint) AS > eventTime#437548L] > [info] +- Deduplicate [value#437541, eventTime#437544-T1ms] > [info] +- EventTimeWatermark eventTime#437544: timestamp, 10 seconds > [info]+- Project [value#437541, cast(value#437541 as timestamp) > AS eventTime#437544] > [info] +- StreamingDataSourceV2Relation [value#437541], > org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@1802eea6, > MemoryStream[value#437541], 0, 1 > [info] > [info] == Physical Plan == > [info] WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources
[jira] [Created] (SPARK-31953) Add Spark Structured Streaming History Server Support
Genmao Yu created SPARK-31953: - Summary: Add Spark Structured Streaming History Server Support Key: SPARK-31953 URL: https://issues.apache.org/jira/browse/SPARK-31953 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 2.4.6, 3.0.0 Reporter: Genmao Yu -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31913) StackOverflowError in FileScanRDD
Genmao Yu created SPARK-31913: - Summary: StackOverflowError in FileScanRDD Key: SPARK-31913 URL: https://issues.apache.org/jira/browse/SPARK-31913 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.5, 3.0.0 Reporter: Genmao Yu Reading from FileScanRDD may failed with a StackOverflowError in my environment: - There are a mass of empty files in table partition。 - Set `spark.sql.files.maxPartitionBytes` with a large value: 1024MB A quick workaround is set `spark.sql.files.maxPartitionBytes` with a small value, like default 128MB. A better way is resolve the recursive calls in FileScanRDD. {code} java.lang.StackOverflowError at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.getSubject(Subject.java:297) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:648) at org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:2828) at org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:2818) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2684) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38) at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:640) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:148) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:143) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:326) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31677) Use KVStore to cache stream query progress
[ https://issues.apache.org/jira/browse/SPARK-31677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-31677: -- Environment: (was: 1. Streaming query progress information are cached twice in *StreamExecution* and *StreamingQueryStatusListener*. It is memory-wasting. We can make this two usage unified. 2. Use *KVStore* instead to cache streaming query progress information.) > Use KVStore to cache stream query progress > -- > > Key: SPARK-31677 > URL: https://issues.apache.org/jira/browse/SPARK-31677 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.5, 3.0.0 >Reporter: Genmao Yu >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31677) Use KVStore to cache stream query progress
[ https://issues.apache.org/jira/browse/SPARK-31677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-31677: -- Description: 1. Streaming query progress information are cached twice in *StreamExecution* and *StreamingQueryStatusListener*. It is memory-wasting. We can make this two usage unified. 2. Use *KVStore* instead to cache streaming query progress information. > Use KVStore to cache stream query progress > -- > > Key: SPARK-31677 > URL: https://issues.apache.org/jira/browse/SPARK-31677 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.5, 3.0.0 >Reporter: Genmao Yu >Priority: Major > > 1. Streaming query progress information are cached twice in *StreamExecution* > and *StreamingQueryStatusListener*. It is memory-wasting. We can make this > two usage unified. > 2. Use *KVStore* instead to cache streaming query progress information. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31677) Use KVStore to cache stream query progress
Genmao Yu created SPARK-31677: - Summary: Use KVStore to cache stream query progress Key: SPARK-31677 URL: https://issues.apache.org/jira/browse/SPARK-31677 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.4.5, 3.0.0 Environment: 1. Streaming query progress information are cached twice in *StreamExecution* and *StreamingQueryStatusListener*. It is memory-wasting. We can make this two usage unified. 2. Use *KVStore* instead to cache streaming query progress information. Reporter: Genmao Yu -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31593) Remove unnecessary streaming query progress update
Genmao Yu created SPARK-31593: - Summary: Remove unnecessary streaming query progress update Key: SPARK-31593 URL: https://issues.apache.org/jira/browse/SPARK-31593 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.5, 3.0.0 Reporter: Genmao Yu Structured Streaming progress reporter will always report an `empty` progress when there is no new data. As design, we should provide progress updates every 10s (default) when there is no new data. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29973) Use nano time to calculate 'processedRowsPerSecond' to avoid 'NaN'/'Infinity'
Genmao Yu created SPARK-29973: - Summary: Use nano time to calculate 'processedRowsPerSecond' to avoid 'NaN'/'Infinity' Key: SPARK-29973 URL: https://issues.apache.org/jira/browse/SPARK-29973 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Genmao Yu The {{"processingTimeSec"}} of batch may be less than 1 millis. As {{"processingTimeSec"}} is calculated in millis, so {{"processingTimeSec"}} equals 0L. If there is no data in this batch, the {{"processedRowsPerSecond"}} equals {{"0/0.0d"}}, i.e. {{"Double.NaN"}}. If there are some data in this batch, the {{"processedRowsPerSecond"}} equals {{"N/0.0d"}}, i.e. {{"Double.Infinity"}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29683) Job failed due to executor failures all available nodes are blacklisted
[ https://issues.apache.org/jira/browse/SPARK-29683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-29683: -- Description: My streaming job will fail *due to executor failures all available nodes are blacklisted*. This exception is thrown only when all node is blacklisted: {code:java} def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet {code} After diving into the code, I found some critical conditions not be handled properly: - unchecked `excludeNodes`: it comes from user config. If not set properly, it may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For example, we may set some nodes not in Yarn cluster. {code:java} excludeNodes = (invalid1, invalid2, invalid3) clusterNodes = (valid1, valid2) {code} - `numClusterNodes` may equals 0: When HA Yarn failover, it will take some time for all NodeManagers to register ResourceManager again. In this case, `numClusterNode` may equals 0 or some other number, and Spark driver failed. - too strong condition check: Spark driver will fail as long as "currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should not indicate a unrecovered fatal. For example, there are some NodeManagers restarting. So we can give some waiting time before job failed. was: My streaming job will fail *due to executor failures all available nodes are blacklisted*. This exception is thrown only when all node is blacklisted: {code} def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet {code} After diving into the code, I found some critical conditions not be handle properly: - unchecked `excludeNodes`: it comes from user config. If not set properly, it may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For example, we may set some nodes not in Yarn cluster. {code} excludeNodes = (invalid1, invalid2, invalid3) clusterNodes = (valid1, valid2) {code} - `numClusterNodes` may equals 0: When HA Yarn failover, it will take some time for all NodeManagers to register ResourceManager again. In this case, `numClusterNode` may equals 0 or some other number, and Spark driver failed. - too strong condition check: Spark driver will fail as long as "currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should not indicate a unrecovered fatal. For example, there are some NodeManagers restarting. So we can give some waiting time before job failed. > Job failed due to executor failures all available nodes are blacklisted > --- > > Key: SPARK-29683 > URL: https://issues.apache.org/jira/browse/SPARK-29683 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 3.0.0 >Reporter: Genmao Yu >Priority: Major > > My streaming job will fail *due to executor failures all available nodes are > blacklisted*. This exception is thrown only when all node is blacklisted: > {code:java} > def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= > numClusterNodes > val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ > allocatorBlacklist.keySet > {code} > After diving into the code, I found some critical conditions not be handled > properly: > - unchecked `excludeNodes`: it comes from user config. If not set properly, > it may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For > example, we may set some nodes not in Yarn cluster. > {code:java} > excludeNodes = (invalid1, invalid2, invalid3) > clusterNodes = (valid1, valid2) > {code} > - `numClusterNodes` may equals 0: When HA Yarn failover, it will take some > time for all NodeManagers to register ResourceManager again. In this case, > `numClusterNode` may equals 0 or some other number, and Spark driver failed. > - too strong condition check: Spark driver will fail as long as > "currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should > not indicate a unrecovered fatal. For example, there are some NodeManagers > restarting. So we can give some waiting time before job failed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29683) Job failed due to executor failures all available nodes are blacklisted
Genmao Yu created SPARK-29683: - Summary: Job failed due to executor failures all available nodes are blacklisted Key: SPARK-29683 URL: https://issues.apache.org/jira/browse/SPARK-29683 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 3.0.0 Reporter: Genmao Yu My streaming job will fail *due to executor failures all available nodes are blacklisted*. This exception is thrown only when all node is blacklisted: {code} def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet {code} After diving into the code, I found some critical conditions not be handle properly: - unchecked `excludeNodes`: it comes from user config. If not set properly, it may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For example, we may set some nodes not in Yarn cluster. {code} excludeNodes = (invalid1, invalid2, invalid3) clusterNodes = (valid1, valid2) {code} - `numClusterNodes` may equals 0: When HA Yarn failover, it will take some time for all NodeManagers to register ResourceManager again. In this case, `numClusterNode` may equals 0 or some other number, and Spark driver failed. - too strong condition check: Spark driver will fail as long as "currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should not indicate a unrecovered fatal. For example, there are some NodeManagers restarting. So we can give some waiting time before job failed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29543) Support Structured Streaming UI
Genmao Yu created SPARK-29543: - Summary: Support Structured Streaming UI Key: SPARK-29543 URL: https://issues.apache.org/jira/browse/SPARK-29543 Project: Spark Issue Type: New Feature Components: Structured Streaming, Web UI Affects Versions: 3.0.0 Reporter: Genmao Yu Open this jira to support structured streaming UI -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29438) Failed to get state store in stream-stream join
[ https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-29438: -- Description: Now, Spark use the `TaskPartitionId` to determine the StateStore path. {code:java} TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName/ {code} In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened. Following is a sample pseudocode: {code:java} val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start() {code} A simplified DAG like this: {code:java} DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3)| (streamDf1)(streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | SortSort | | \ / \ / \/ \ / SortMergeJoinStreamingSymmetricHashJoin \ / \ / \ / Union {code} Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file. {code:java} LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan | | | | BroadcastExchange | Exchange(200) Exchange(200) | | | | | | | | \/ \ / \ /\ / BroadcastHashJoin StreamingSymmetricHashJoin \ / \ / \ / Union {code} In my job, I closed the auto BroadcastJoin feature (set spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should make the StateStore path determinate but not depends on TaskPartitionId. was: Now, Spark use the `TaskPartitionId` to determine the StateStore path. {code:java} TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName/ {code} In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened: {code:java} val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start() {code} A simplified DAG like this: {code:java} DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3)| (streamDf1)(streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | SortSort | | \ / \ / \/ \ / SortMergeJoinStreamingSymmetricHashJoin \ / \ / \ / Union {code} Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with er
[jira] [Commented] (SPARK-29438) Failed to get state store in stream-stream join
[ https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949461#comment-16949461 ] Genmao Yu commented on SPARK-29438: --- [~kabhwan] Yes, the whole statestore path is `CheckpointRoot/operatorId/taskPartitionId/version/`. The `taskPartitionId` may change in some corner case in subsequent batch, that is what I mean. > Failed to get state store in stream-stream join > --- > > Key: SPARK-29438 > URL: https://issues.apache.org/jira/browse/SPARK-29438 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Genmao Yu >Priority: Critical > > Now, Spark use the `TaskPartitionId` to determine the StateStore path. > {code:java} > TaskPartitionId \ > StateStoreVersion --> StoreProviderId -> StateStore > StateStoreName/ > {code} > In spark stages, the task partition id is determined by the number of tasks. > As we said the StateStore file path depends on the task partition id. So if > stream-stream join task partition id is changed against last batch, it will > get wrong StateStore data or fail with non-exist StateStore data. In some > corner cases, it happened: > {code:java} > val df3 = streamDf1.join(streamDf2) > val df5 = streamDf3.join(batchDf4) > val df = df3.union(df5) > df.writeStream...start() > {code} > A simplified DAG like this: > {code:java} > DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan > (streamDf3)| (streamDf1)(streamDf2) > | | | | > Exchange(200) Exchange(200) Exchange(200) Exchange(200) > | | | | >SortSort | | > \ / \ / > \/ \ / > SortMergeJoinStreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > Stream-Steam join task Id will start from 200 to 399 as they are in the same > stage with `SortMergeJoin`. But when there is no new incoming data in > `streamDf3` in some batch, it will generate a empty LocalRelation, and then > the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, > Stream-Steam join task Id will start from 1 to 200. Finally, it will get > wrong StateStore path through TaskPartitionId, and failed with error reading > state store delta file. > {code:java} > LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan > | | | | > BroadcastExchange | Exchange(200) Exchange(200) > | | | | > | | | | > \/ \ / >\ /\ / > BroadcastHashJoin StreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > In my job, I closed the auto BroadcastJoin feature (set > spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should > make the StateStore path determinate but not depends on TaskPartitionId. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-29438) Failed to get state store in stream-stream join
[ https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949365#comment-16949365 ] Genmao Yu edited comment on SPARK-29438 at 10/11/19 11:15 AM: -- There are several optional alternatives to resolve this issue: * Adding some rules to make the stream-stream join task partition id more determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. * As said in desc, the StateStore path should not depend on the TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But this may increase some RPC load in Driver. * Dynamically disable the `autoBroadcastJoin` in some rules. was (Author: unclegen): There are several optional alternatives to resolve this issue: * Adding some rules to make the stream-stream join task partition id more determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. * As said in desc, the StateStore path should not depend on the TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But this may increase the RPC load in Driver. * Dynamically disable the `autoBroadcastJoin` in some rules. > Failed to get state store in stream-stream join > --- > > Key: SPARK-29438 > URL: https://issues.apache.org/jira/browse/SPARK-29438 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Genmao Yu >Priority: Critical > > Now, Spark use the `TaskPartitionId` to determine the StateStore path. > {code:java} > TaskPartitionId \ > StateStoreVersion --> StoreProviderId -> StateStore > StateStoreName/ > {code} > In spark stages, the task partition id is determined by the number of tasks. > As we said the StateStore file path depends on the task partition id. So if > stream-stream join task partition id is changed against last batch, it will > get wrong StateStore data or fail with non-exist StateStore data. In some > corner cases, it happened: > {code:java} > val df3 = streamDf1.join(streamDf2) > val df5 = streamDf3.join(batchDf4) > val df = df3.union(df5) > df.writeStream...start() > {code} > A simplified DAG like this: > {code:java} > DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan > (streamDf3)| (streamDf1)(streamDf2) > | | | | > Exchange(200) Exchange(200) Exchange(200) Exchange(200) > | | | | >SortSort | | > \ / \ / > \/ \ / > SortMergeJoinStreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > Stream-Steam join task Id will start from 200 to 399 as they are in the same > stage with `SortMergeJoin`. But when there is no new incoming data in > `streamDf3` in some batch, it will generate a empty LocalRelation, and then > the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, > Stream-Steam join task Id will start from 1 to 200. Finally, it will get > wrong StateStore path through TaskPartitionId, and failed with error reading > state store delta file. > {code:java} > LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan > | | | | > BroadcastExchange | Exchange(200) Exchange(200) > | | | | > | | | | > \/ \ / >\ /\ / > BroadcastHashJoin StreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > In my job, I closed the auto BroadcastJoin feature (set > spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should > make the StateStore path determinate but not depends on TaskPartitionId. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional comm
[jira] [Comment Edited] (SPARK-29438) Failed to get state store in stream-stream join
[ https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949365#comment-16949365 ] Genmao Yu edited comment on SPARK-29438 at 10/11/19 11:15 AM: -- There are several optional alternatives to resolve this issue: * Adding some rules to make the stream-stream join task partition id more determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. * As said in desc, the StateStore path should not depend on the TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But this may increase the RPC load in Driver. * Dynamically disable the `autoBroadcastJoin` in some rules. was (Author: unclegen): There are several optional alternatives to resolve this issue: * Adding some rules to make the stream-stream join task partition id more determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. * As said in desc, the StateStore path should not depend on the TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But this may increase the RPC load in Driver. * Dynamically disable the `autoBroadcastJoin` in some rules. > Failed to get state store in stream-stream join > --- > > Key: SPARK-29438 > URL: https://issues.apache.org/jira/browse/SPARK-29438 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Genmao Yu >Priority: Critical > > Now, Spark use the `TaskPartitionId` to determine the StateStore path. > {code:java} > TaskPartitionId \ > StateStoreVersion --> StoreProviderId -> StateStore > StateStoreName/ > {code} > In spark stages, the task partition id is determined by the number of tasks. > As we said the StateStore file path depends on the task partition id. So if > stream-stream join task partition id is changed against last batch, it will > get wrong StateStore data or fail with non-exist StateStore data. In some > corner cases, it happened: > {code:java} > val df3 = streamDf1.join(streamDf2) > val df5 = streamDf3.join(batchDf4) > val df = df3.union(df5) > df.writeStream...start() > {code} > A simplified DAG like this: > {code:java} > DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan > (streamDf3)| (streamDf1)(streamDf2) > | | | | > Exchange(200) Exchange(200) Exchange(200) Exchange(200) > | | | | >SortSort | | > \ / \ / > \/ \ / > SortMergeJoinStreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > Stream-Steam join task Id will start from 200 to 399 as they are in the same > stage with `SortMergeJoin`. But when there is no new incoming data in > `streamDf3` in some batch, it will generate a empty LocalRelation, and then > the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, > Stream-Steam join task Id will start from 1 to 200. Finally, it will get > wrong StateStore path through TaskPartitionId, and failed with error reading > state store delta file. > {code:java} > LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan > | | | | > BroadcastExchange | Exchange(200) Exchange(200) > | | | | > | | | | > \/ \ / >\ /\ / > BroadcastHashJoin StreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > In my job, I closed the auto BroadcastJoin feature (set > spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should > make the StateStore path determinate but not depends on TaskPartitionId. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional com
[jira] [Commented] (SPARK-29438) Failed to get state store in stream-stream join
[ https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949365#comment-16949365 ] Genmao Yu commented on SPARK-29438: --- There are several optional alternatives to resolve this issue: * Adding some rules to make the stream-stream join task partition id more determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. * As said in desc, the StateStore path should not depend on the TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But this may increase the RPC load in Driver. * Dynamically disable the `autoBroadcastJoin` in some rules. > Failed to get state store in stream-stream join > --- > > Key: SPARK-29438 > URL: https://issues.apache.org/jira/browse/SPARK-29438 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.4 >Reporter: Genmao Yu >Priority: Critical > > Now, Spark use the `TaskPartitionId` to determine the StateStore path. > {code:java} > TaskPartitionId \ > StateStoreVersion --> StoreProviderId -> StateStore > StateStoreName/ > {code} > In spark stages, the task partition id is determined by the number of tasks. > As we said the StateStore file path depends on the task partition id. So if > stream-stream join task partition id is changed against last batch, it will > get wrong StateStore data or fail with non-exist StateStore data. In some > corner cases, it happened: > {code:java} > val df3 = streamDf1.join(streamDf2) > val df5 = streamDf3.join(batchDf4) > val df = df3.union(df5) > df.writeStream...start() > {code} > A simplified DAG like this: > {code:java} > DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan > (streamDf3)| (streamDf1)(streamDf2) > | | | | > Exchange(200) Exchange(200) Exchange(200) Exchange(200) > | | | | >SortSort | | > \ / \ / > \/ \ / > SortMergeJoinStreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > Stream-Steam join task Id will start from 200 to 399 as they are in the same > stage with `SortMergeJoin`. But when there is no new incoming data in > `streamDf3` in some batch, it will generate a empty LocalRelation, and then > the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, > Stream-Steam join task Id will start from 1 to 200. Finally, it will get > wrong StateStore path through TaskPartitionId, and failed with error reading > state store delta file. > {code:java} > LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan > | | | | > BroadcastExchange | Exchange(200) Exchange(200) > | | | | > | | | | > \/ \ / >\ /\ / > BroadcastHashJoin StreamingSymmetricHashJoin > \ / >\ / > \ / > Union > {code} > In my job, I closed the auto BroadcastJoin feature (set > spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should > make the StateStore path determinate but not depends on TaskPartitionId. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29438) Failed to get state store in stream-stream join
[ https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-29438: -- Description: Now, Spark use the `TaskPartitionId` to determine the StateStore path. {code:java} TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName/ {code} In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened: {code:java} val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start() {code} A simplified DAG like this: {code:java} DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3)| (streamDf1)(streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | SortSort | | \ / \ / \/ \ / SortMergeJoinStreamingSymmetricHashJoin \ / \ / \ / Union {code} Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file. {code:java} LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan | | | | BroadcastExchange | Exchange(200) Exchange(200) | | | | | | | | \/ \ / \ /\ / BroadcastHashJoin StreamingSymmetricHashJoin \ / \ / \ / Union {code} In my job, I closed the auto BroadcastJoin feature (set spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should make the StateStore path determinate but not depends on TaskPartitionId. was: Now, Spark use the `TaskPartitionId` to determine the StateStore path. {code:java} TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName/ {code} In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened: {code:java} val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start() {code} A simplified DAG like this: {code:java} DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3)| (streamDf1)(streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | SortSort | | \ / \ / \/ \ / SortMergeJoinStreamingSymmetricHashJoin \ / \ / \ / Union {code} Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta fil
[jira] [Created] (SPARK-29438) Failed to get state store in stream-stream join
Genmao Yu created SPARK-29438: - Summary: Failed to get state store in stream-stream join Key: SPARK-29438 URL: https://issues.apache.org/jira/browse/SPARK-29438 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.4 Reporter: Genmao Yu Now, Spark use the `TaskPartitionId` to determine the StateStore path. {code:java} TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName/ {code} In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened: {code:java} val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start() {code} A simplified DAG like this: {code:java} DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3)| (streamDf1)(streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | SortSort | | \ / \ / \/ \ / SortMergeJoinStreamingSymmetricHashJoin \ / \ / \ / Union {code} Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file. {code:java} LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan | | | | BroadcastExchange | Exchange(200) Exchange(200) | | | | | | | | \/ \ / \ /\ / BroadcastHashJoin StreamingSymmetricHashJoin \ / \ / \ / Union {code} In my job, I closed the auto BroadcastJoin feature (set spark.sql.autoBroadcastJoinThreshold=false) to walk around this bug. We should make the StateStore path determinate but not depends on TaskPartitionId. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28256) Failed to initialize FileContextBasedCheckpointFileManager with uri without authority
[ https://issues.apache.org/jira/browse/SPARK-28256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-28256: -- Description: reproduce code {code:sql} CREATE TABLE `user_click_count` (`userId` STRING, `click` BIGINT) USING org.apache.spark.sql.json OPTIONS (path 'hdfs:///tmp/test'); {code} error: {code:java} java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85) at org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98) at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379) ... Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:134) ... 67 more Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/tmp/test/_spark_metadata at org.apache.hadoop.fs.AbstractFileSystem.getUri(AbstractFileSystem.java:313) at org.apache.hadoop.fs.AbstractFileSystem.(AbstractFileSystem.java:266) at org.apache.hadoop.fs.Hdfs.(Hdfs.java:80) ... 72 more {code} was: {code:java} java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85) at org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98) at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379) ... Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Na
[jira] [Updated] (SPARK-28256) Failed to initialize FileContextBasedCheckpointFileManager with uri without authority
[ https://issues.apache.org/jira/browse/SPARK-28256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-28256: -- Description: {code:java} java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85) at org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98) at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379) ... Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:134) ... 67 more Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/tmp/test/_spark_metadata at org.apache.hadoop.fs.AbstractFileSystem.getUri(AbstractFileSystem.java:313) at org.apache.hadoop.fs.AbstractFileSystem.(AbstractFileSystem.java:266) at org.apache.hadoop.fs.Hdfs.(Hdfs.java:80) ... 72 more {code} was: {code} java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85) at org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98) at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379) ... Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newIns
[jira] [Created] (SPARK-28256) Failed to initialize FileContextBasedCheckpointFileManager with uri without authority
Genmao Yu created SPARK-28256: - Summary: Failed to initialize FileContextBasedCheckpointFileManager with uri without authority Key: SPARK-28256 URL: https://issues.apache.org/jira/browse/SPARK-28256 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Genmao Yu {code} java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85) at org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98) at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297) at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379) ... Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:134) ... 67 more Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/tmp/test13/_spark_metadata at org.apache.hadoop.fs.AbstractFileSystem.getUri(AbstractFileSystem.java:313) at org.apache.hadoop.fs.AbstractFileSystem.(AbstractFileSystem.java:266) at org.apache.hadoop.fs.Hdfs.(Hdfs.java:80) ... 72 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28158) Hive UDFs supports UDT type
Genmao Yu created SPARK-28158: - Summary: Hive UDFs supports UDT type Key: SPARK-28158 URL: https://issues.apache.org/jira/browse/SPARK-28158 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.3, 3.0.0 Reporter: Genmao Yu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27717) support UNION in continuous processing
Genmao Yu created SPARK-27717: - Summary: support UNION in continuous processing Key: SPARK-27717 URL: https://issues.apache.org/jira/browse/SPARK-27717 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Genmao Yu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27715) SQL query details in UI dose not show in correct format.
Genmao Yu created SPARK-27715: - Summary: SQL query details in UI dose not show in correct format. Key: SPARK-27715 URL: https://issues.apache.org/jira/browse/SPARK-27715 Project: Spark Issue Type: Bug Components: SQL, Web UI Affects Versions: 3.0.0 Reporter: Genmao Yu -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26302) retainedBatches configuration can eat up memory on driver
[ https://issues.apache.org/jira/browse/SPARK-26302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839253#comment-16839253 ] Genmao Yu edited comment on SPARK-26302 at 5/14/19 9:27 AM: Adding some warning in documentation is reasonable. was (Author: unclegen): Add some warning in documentation is reasonable. > retainedBatches configuration can eat up memory on driver > - > > Key: SPARK-26302 > URL: https://issues.apache.org/jira/browse/SPARK-26302 > Project: Spark > Issue Type: Improvement > Components: Documentation, DStreams >Affects Versions: 2.4.0 >Reporter: Behroz Sikander >Priority: Minor > Attachments: heap_dump_detail.png > > > The documentation for configuration "spark.streaming.ui.retainedBatches" says > "How many batches the Spark Streaming UI and status APIs remember before > garbage collecting" > The default for this configuration is 1000. > From our experience, the documentation is incomplete and we found it the hard > way. > The size of a single BatchUIData is around 750KB. Increasing this value to > something like 5000 increases the total size to ~4GB. > If your driver heap is not big enough, the job starts to slow down, has > frequent GCs and has long scheduling days. Once the heap is full, the job > cannot be recovered. > A note of caution should be added to the documentation to let users know the > impact of this seemingly harmless configuration property. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26302) retainedBatches configuration can eat up memory on driver
[ https://issues.apache.org/jira/browse/SPARK-26302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839253#comment-16839253 ] Genmao Yu commented on SPARK-26302: --- Add some warning in documentation is reasonable. > retainedBatches configuration can eat up memory on driver > - > > Key: SPARK-26302 > URL: https://issues.apache.org/jira/browse/SPARK-26302 > Project: Spark > Issue Type: Improvement > Components: Documentation, DStreams >Affects Versions: 2.4.0 >Reporter: Behroz Sikander >Priority: Minor > Attachments: heap_dump_detail.png > > > The documentation for configuration "spark.streaming.ui.retainedBatches" says > "How many batches the Spark Streaming UI and status APIs remember before > garbage collecting" > The default for this configuration is 1000. > From our experience, the documentation is incomplete and we found it the hard > way. > The size of a single BatchUIData is around 750KB. Increasing this value to > something like 5000 increases the total size to ~4GB. > If your driver heap is not big enough, the job starts to slow down, has > frequent GCs and has long scheduling days. Once the heap is full, the job > cannot be recovered. > A note of caution should be added to the documentation to let users know the > impact of this seemingly harmless configuration property. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26278) V2 Streaming sources cannot be written to V1 sinks
[ https://issues.apache.org/jira/browse/SPARK-26278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839246#comment-16839246 ] Genmao Yu commented on SPARK-26278: --- [~jpolchlo] Could you please close this jira? This issue has been fixed. > V2 Streaming sources cannot be written to V1 sinks > -- > > Key: SPARK-26278 > URL: https://issues.apache.org/jira/browse/SPARK-26278 > Project: Spark > Issue Type: Bug > Components: Input/Output, Structured Streaming >Affects Versions: 2.3.2 >Reporter: Justin Polchlopek >Priority: Major > > Starting from a streaming DataFrame derived from a custom v2 MicroBatch > reader, we have > {code:java} > val df: DataFrame = ... > assert(df.isStreaming) > val outputFormat = "orc" // also applies to "csv" and "json" but not > "console" > df.writeStream > .format(outputFormat) > .option("checkpointLocation", "/tmp/checkpoints") > .option("path", "/tmp/result") > .start > {code} > This code fails with the following stack trace: > {code:java} > 2018-12-04 08:24:27 ERROR MicroBatchExecution:91 - Query [id = > 193f97bf-8064-4658-8aa6-0f481919eafe, runId = > e96ed7e5-aaf4-4ef4-a3f3-05fe0b01a715] terminated with error > java.lang.ClassCastException: > org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to > org.apache.spark.sql.sources.v2.reader.streaming.Offset > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at > org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code} > I'm filing this issue on the suggestion of [~mojodna] who suggests that this > problem could be resolved by backporting streaming sinks fro
[jira] [Commented] (SPARK-27634) deleteCheckpointOnStop should be configurable
[ https://issues.apache.org/jira/browse/SPARK-27634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833281#comment-16833281 ] Genmao Yu commented on SPARK-27634: --- Do not add patch here. You can submit a PR to Spark. [How to contribute code to Spark|http://spark.apache.org/contributing.html] > deleteCheckpointOnStop should be configurable > - > > Key: SPARK-27634 > URL: https://issues.apache.org/jira/browse/SPARK-27634 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.2 >Reporter: Yu Wang >Priority: Minor > Fix For: 2.4.2 > > Attachments: SPARK-27634.patch > > > we need to delete checkpoint file after running the stream application > multiple times, so deleteCheckpointOnStop should be configurable -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27503) JobGenerator thread exit for some fatal errors but application keeps running
Genmao Yu created SPARK-27503: - Summary: JobGenerator thread exit for some fatal errors but application keeps running Key: SPARK-27503 URL: https://issues.apache.org/jira/browse/SPARK-27503 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 3.0.0 Reporter: Genmao Yu JobGenerator thread (including some other EventLoop threads) may exit for some fatal error, like OOM, but Spark Streaming job keep running with no batch job generating. Currently, we only report any non-fatal error. {code} override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } {code} In some corner cases, these event threads may exit with OOM error, but driver thread can still keep running. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27413) Keep the same epoch pace between driver and executor.
Genmao Yu created SPARK-27413: - Summary: Keep the same epoch pace between driver and executor. Key: SPARK-27413 URL: https://issues.apache.org/jira/browse/SPARK-27413 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Genmao Yu The pace of epoch generation in driver and epoch pulling in executor is different. It will result in many empty epochs for partition if the epoch pulling interval is larger than epoch generation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27355) make query execution more sensitive to epoch message late or lost
Genmao Yu created SPARK-27355: - Summary: make query execution more sensitive to epoch message late or lost Key: SPARK-27355 URL: https://issues.apache.org/jira/browse/SPARK-27355 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Genmao Yu In SPARK-23503, we enforce sequencing of committed epochs for Continuous Execution. In case a message for epoch n is lost and epoch (n + 1) is ready for commit before epoch n is, epoch (n + 1) will wait for epoch n to be committed first. With extreme condition, we will wait for `epochBacklogQueueSize` (1 in default) epochs and then failed. There is no need to wait for such a long time before query fail, and we can make the condition more sensitive. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming
[ https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798635#comment-16798635 ] Genmao Yu commented on SPARK-27218: --- Could you please test it on master branch? > spark-sql-kafka-0-10 startingOffset=earliest not working as expected with > streaming > --- > > Key: SPARK-27218 > URL: https://issues.apache.org/jira/browse/SPARK-27218 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Windows 10, spark-2.4.0-bin-hadoop2.7 >Reporter: Emanuele Sabellico >Priority: Minor > > Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 > with a code like this: > {noformat} > spark.readStream > .format("kafka") > .option("subscribe", "test1") > .option("startingOffsets", "earliest") > .load(){noformat} > I find that Spark doesn't start from the earliest offset but from the latest. > Or better, initially it gets the earliest offsets but then it does a seek to > end, skipping the messages in-between. > In the logs I find this: > {noformat} > 2019-03-20 11:59:50 INFO KafkaMicroBatchReader:54 - Initial offsets: > {"test1":{"0":1740}} > 2019-03-20 11:59:50 INFO Fetcher:583 - [Consumer clientId=consumer-1, > groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0] > Resetting offset for partition test1-0 to offset 15922. > {noformat} > Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ > the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is > a _consumer.seekToEnd(partitions)_ > According to the documentation I was expecting that the streaming would have > started from the earliest offset in this case. Is there something that I'm > getting wrong or doing wrong? > Thanks in advance! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591151#comment-16591151 ] Genmao Yu commented on SPARK-24630: --- [~Jackey Lee] I am glad to participate in code review. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565252#comment-16565252 ] Genmao Yu commented on SPARK-24630: --- [~Jackey Lee] Pretty good! We also have the SQL Streaming demand. Is there any more detailed design doc? Or we can list those streaming sql syntax which is different from batch firstly. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561762#comment-16561762 ] Genmao Yu edited comment on SPARK-24630 at 7/30/18 11:27 AM: - Practice to add the StreamSQL DDL, like this: {code:java} spark-sql> create source stream service_logs with ("type"="kafka","bootstrap.servers"="x.x.x.x:9092", "topic"="service_log"); Time taken: 1.833 seconds spark-sql> desc service_logs; key binary NULL value binary NULL topic string NULL partition int NULL offset bigint NULL timestamp timestamp NULL timestampType int NULL Time taken: 0.394 seconds, Fetched 7 row(s) spark-sql> create sink stream analysis with ("type"="kafka", "outputMode"="complete", "bootstrap.servers"="x.x.x.x:9092", "topic"="analysis", "checkpointLocation"="hdfs:///tmp/cp0"); Time taken: 0.027 seconds spark-sql> insert into analysis select key, value, count(*) from service_logs group by key, value; Time taken: 0.355 seconds {code} was (Author: unclegen): Try to add the StreamSQL DDL, like this: {code:java} spark-sql> create source stream service_logs with ("type"="kafka","bootstrap.servers"="x.x.x.x:9092", "topic"="service_log"); Time taken: 1.833 seconds spark-sql> desc service_logs; key binary NULL value binary NULL topic string NULL partition int NULL offset bigint NULL timestamp timestamp NULL timestampType int NULL Time taken: 0.394 seconds, Fetched 7 row(s) spark-sql> create sink stream analysis with ("type"="kafka", "outputMode"="complete", "bootstrap.servers"="x.x.x.x:9092", "topic"="analysis", "checkpointLocation"="hdfs:///tmp/cp0"); Time taken: 0.027 seconds spark-sql> insert into analysis select key, value, count(*) from service_logs group by key, value; Time taken: 0.355 seconds {code} > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-24630: -- Attachment: (was: image-2018-07-30-18-48-38-352.png) > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-24630: -- Attachment: (was: image-2018-07-30-18-06-30-506.png) > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561762#comment-16561762 ] Genmao Yu edited comment on SPARK-24630 at 7/30/18 10:53 AM: - Try to add the StreamSQL DDL, like this: {code:java} spark-sql> create source stream service_logs with ("type"="kafka","bootstrap.servers"="x.x.x.x:9092", "topic"="service_log"); Time taken: 1.833 seconds spark-sql> desc service_logs; key binary NULL value binary NULL topic string NULL partition int NULL offset bigint NULL timestamp timestamp NULL timestampType int NULL Time taken: 0.394 seconds, Fetched 7 row(s) spark-sql> create sink stream analysis with ("type"="kafka", "outputMode"="complete", "bootstrap.servers"="x.x.x.x:9092", "topic"="analysis", "checkpointLocation"="hdfs:///tmp/cp0"); Time taken: 0.027 seconds spark-sql> insert into analysis select key, value, count(*) from service_logs group by key, value; Time taken: 0.355 seconds {code} was (Author: unclegen): Try to add the StreamSQL DDL, like this: !image-2018-07-30-18-48-38-352.png! > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf, > image-2018-07-30-18-06-30-506.png, image-2018-07-30-18-48-38-352.png > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561762#comment-16561762 ] Genmao Yu commented on SPARK-24630: --- Try to add the StreamSQL DDL, like this: !image-2018-07-30-18-48-38-352.png! > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf, > image-2018-07-30-18-06-30-506.png, image-2018-07-30-18-48-38-352.png > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-24630: -- Attachment: image-2018-07-30-18-48-38-352.png > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf, > image-2018-07-30-18-06-30-506.png, image-2018-07-30-18-48-38-352.png > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-24630: -- Attachment: image-2018-07-30-18-06-30-506.png > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf, image-2018-07-30-18-06-30-506.png > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556837#comment-16556837 ] Genmao Yu edited comment on SPARK-24630 at 7/26/18 3:24 AM: [~zsxwing] Is there plan to better support SQL on streaming? like provide stream table ddl, window and watermark syntax on stream etc. was (Author: unclegen): [~zsxwing] Is there plan to better support SQL on streaming? like provide stream table ddl, window ** and watermark ** syntax on stream etc. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556837#comment-16556837 ] Genmao Yu commented on SPARK-24630: --- [~zsxwing] Is there plan to better support SQL on streaming? like provide stream table ddl, window ** and watermark ** syntax on stream etc. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554043#comment-16554043 ] Genmao Yu edited comment on SPARK-24630 at 7/24/18 11:38 AM: - [~zsxwing] {{Structured Streaming supports standard SQL as the batch queries, so the users can switch their queries between batch and streaming easily.}} IIUC, there are some queries we can not switch from stream to batch, like "*groupBy window a**ggregation"* or "*over window a**ggregation"* on stream. Isn't it? was (Author: unclegen): {{Structured Streaming supports standard SQL as the batch queries, so the users can switch their queries between batch and streaming easily.}} IIUC, there are some queries we can not switch from stream to batch, like "*groupBy window a**ggregation"* or "*over window a**ggregation"* on stream. Isn't it? > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554043#comment-16554043 ] Genmao Yu commented on SPARK-24630: --- {{Structured Streaming supports standard SQL as the batch queries, so the users can switch their queries between batch and streaming easily.}} IIUC, there are some queries we can not switch from stream to batch, like "*groupBy window a**ggregation"*** or "*over window a**ggregation"*** on stream. Isn't it? > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554043#comment-16554043 ] Genmao Yu edited comment on SPARK-24630 at 7/24/18 10:07 AM: - {{Structured Streaming supports standard SQL as the batch queries, so the users can switch their queries between batch and streaming easily.}} IIUC, there are some queries we can not switch from stream to batch, like "*groupBy window a**ggregation"* or "*over window a**ggregation"* on stream. Isn't it? was (Author: unclegen): {{Structured Streaming supports standard SQL as the batch queries, so the users can switch their queries between batch and streaming easily.}} IIUC, there are some queries we can not switch from stream to batch, like "*groupBy window a**ggregation"*** or "*over window a**ggregation"*** on stream. Isn't it? > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20672) Keep the `isStreaming` property in triggerLogicalPlan in Structured Streaming
Genmao Yu created SPARK-20672: - Summary: Keep the `isStreaming` property in triggerLogicalPlan in Structured Streaming Key: SPARK-20672 URL: https://issues.apache.org/jira/browse/SPARK-20672 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.1.1 Reporter: Genmao Yu In Structured Streaming, the "isStreaming" property will be eliminated in each triggerLogicalPlan. Then, some rules will be applied to this triggerLogicalPlan mistakely. So, we should refactor existing code to better execute batch query and ss query. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20139) Spark UI reports partial success for completed stage while log shows all tasks are finished
[ https://issues.apache.org/jira/browse/SPARK-20139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950244#comment-15950244 ] Genmao Yu commented on SPARK-20139: --- The event queue exceeds its capacity, so new events will be dropped. > Spark UI reports partial success for completed stage while log shows all > tasks are finished > --- > > Key: SPARK-20139 > URL: https://issues.apache.org/jira/browse/SPARK-20139 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.1.0 >Reporter: Etti Gur > Attachments: screenshot-1.png > > > Spark UI reports partial success for completed stage while log shows all > tasks are finished - i.e.: > We have a stage that is presented under completed stages on spark UI, > but the successful tasks are shown like so: (146372/524964) not as you'd > expect (524964/524964) > Looking at the application master log shows all tasks in that stage are > successful: > 17/03/29 09:45:49 INFO TaskSetManager: Finished task 522973.0 in stage 0.0 > (TID 522973) in 1163910 ms on ip-10-1-15-34.ec2.internal (executor 116) > (524963/524964) > 17/03/29 09:45:49 INFO TaskSetManager: Finished task 12508.0 in stage 2.0 > (TID 537472) in 241250 ms on ip-10-1-15-14.ec2.internal (executor 38) > (20234/20262) > 17/03/29 09:45:49 INFO TaskSetManager: Finished task 12465.0 in stage 2.0 > (TID 537429) in 241994 ms on ip-10-1-15-106.ec2.internal (executor 133) > (20235/20262) > 17/03/29 09:45:49 INFO TaskSetManager: Finished task 15079.0 in stage 2.0 > (TID 540043) in 202889 ms on ip-10-1-15-173.ec2.internal (executor 295) > (20236/20262) > 17/03/29 09:45:49 INFO TaskSetManager: Finished task 19828.0 in stage 2.0 > (TID 544792) in 137845 ms on ip-10-1-15-147.ec2.internal (executor 43) > (20237/20262) > 17/03/29 09:45:50 INFO TaskSetManager: Finished task 19072.0 in stage 2.0 > (TID 544036) in 147363 ms on ip-10-1-15-19.ec2.internal (executor 175) > (20238/20262) > 17/03/29 09:45:50 INFO TaskSetManager: Finished task 524146.0 in stage 0.0 > (TID 524146) in 889950 ms on ip-10-1-15-72.ec2.internal (executor 74) > (524964/524964) > Also in the log we get an error: > 17/03/29 08:24:16 ERROR LiveListenerBus: Dropping SparkListenerEvent because > no remaining room in event queue. This likely means one of the SparkListeners > is too slow and cannot keep up with the rate at which tasks are being started > by the scheduler. > This looks like the stage is indeed completed with all its tasks but UI shows > like not all tasks really finished. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20065) Empty output files created for aggregation query in append mode
[ https://issues.apache.org/jira/browse/SPARK-20065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15937904#comment-15937904 ] Genmao Yu commented on SPARK-20065: --- Make sense, I will give a fast update. > Empty output files created for aggregation query in append mode > --- > > Key: SPARK-20065 > URL: https://issues.apache.org/jira/browse/SPARK-20065 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Silvio Fiorito > > I've got a Kafka topic which I'm querying, running a windowed aggregation, > with a 30 second watermark, 10 second trigger, writing out to Parquet with > append output mode. > Every 10 second trigger generates a file, regardless of whether there was any > data for that trigger, or whether any records were actually finalized by the > watermark. > Is this expected behavior or should it not write out these empty files? > {code} > val df = spark.readStream.format("kafka") > val query = df > .withWatermark("timestamp", "30 seconds") > .groupBy(window($"timestamp", "10 seconds")) > .count() > .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count") > query > .writeStream > .format("parquet") > .option("checkpointLocation", aggChk) > .trigger(ProcessingTime("10 seconds")) > .outputMode("append") > .start(aggPath) > {code} > As the query executes, do a file listing on "aggPath" and you'll see 339 byte > files at a minimum until we arrive at the first watermark and the initial > batch is finalized. Even after that though, as there are empty batches it'll > keep generating empty files every trigger. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20061) Reading a file with colon (:) from S3 fails with URISyntaxException
[ https://issues.apache.org/jira/browse/SPARK-20061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15937675#comment-15937675 ] Genmao Yu commented on SPARK-20061: --- Colon is not supported in hadoop, see [HDFS-13|https://issues.apache.org/jira/browse/HDFS-13] > Reading a file with colon (:) from S3 fails with URISyntaxException > --- > > Key: SPARK-20061 > URL: https://issues.apache.org/jira/browse/SPARK-20061 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 > Environment: EC2, AWS >Reporter: Michel Lemay > > When reading a bunch of files from s3 using wildcards, it fails with the > following exception: > {code} > scala> val fn = "s3a://mybucket/path/*/" > scala> val ds = spark.readStream.schema(schema).json(fn) > java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative > path in absolute URI: > 2017-01-06T20:33:45.255-analyticsqa-49569270507599054034141623773442922465540524816321216514.json > at org.apache.hadoop.fs.Path.initialize(Path.java:205) > at org.apache.hadoop.fs.Path.(Path.java:171) > at org.apache.hadoop.fs.Path.(Path.java:93) > at org.apache.hadoop.fs.Globber.glob(Globber.java:241) > at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1657) > at > org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:237) > at > org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:243) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$2.apply(DataSource.scala:131) > at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$2.apply(DataSource.scala:127) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.immutable.List.flatMap(List.scala:344) > at > org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:127) > at > org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:124) > at > org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:138) > at > org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:229) > at > org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87) > at > org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87) > at > org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) > at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) > at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:133) > at > org.apache.spark.sql.streaming.DataStreamReader.json(DataStreamReader.scala:181) > ... 50 elided > Caused by: java.net.URISyntaxException: Relative path in absolute URI: > 2017-01-06T20:33:45.255-analyticsqa-49569270507599054034141623773442922465540524816321216514.json > at java.net.URI.checkPath(URI.java:1823) > at java.net.URI.(URI.java:745) > at org.apache.hadoop.fs.Path.initialize(Path.java:202) > ... 73 more > {code} > The file in question sits at the root of s3a://mybucket/path/ > {code} > aws s3 ls s3://mybucket/path/ >PRE subfolder1/ >PRE subfolder2/ > ... > 2017-01-06 20:33:46 1383 > 2017-01-06T20:33:45.255-analyticsqa-49569270507599054034141623773442922465540524816321216514.json > ... > {code} > Removing the wildcard from path make it work but it obviously does misses all > files in subdirectories. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20021) Miss backslash in python code
Genmao Yu created SPARK-20021: - Summary: Miss backslash in python code Key: SPARK-20021 URL: https://issues.apache.org/jira/browse/SPARK-20021 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19926) Make pyspark exception more readable
[ https://issues.apache.org/jira/browse/SPARK-19926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-19926: -- Description: Exception in pyspark is a little difficult to read. like: {code} Traceback (most recent call last): File "", line 5, in File "/root/dev/spark/dist/python/pyspark/sql/streaming.py", line 853, in start return self._sq(self._jwrite.start()) File "/root/dev/spark/dist/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/root/dev/spark/dist/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nAggregate [window#17, word#5], [window#17 AS window#11, word#5, count(1) AS count#16L]\n+- Filter ((t#6 >= window#17.start) && (t#6 < window#17.end))\n +- Expand [ArrayBuffer(named_struct(start, CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, (CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0) + 3000)), word#5, t#6-T3ms), ArrayBuffer(named_struct(start, CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, (CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0) + 3000)), word#5, t#6-T3ms)], [window#17, word#5, t#6-T3ms]\n +- EventTimeWatermark t#6: timestamp, interval 30 seconds\n +- Project [cast(word#0 as string) AS word#5, cast(t#1 as timestamp) AS t#6]\n +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@c4079ca,csv,List(),Some(StructType(StructField(word,StringType,true), StructField(t,IntegerType,true))),List(),None,Map(sep -> ;, path -> /tmp/data),None), FileSource[/tmp/data], [word#0, t#1]\n' {code} > Make pyspark exception more readable > > > Key: SPARK-19926 > URL: https://issues.apache.org/jira/browse/SPARK-19926 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Priority: Minor > > Exception in pyspark is a little difficult to read. > like: > {code} > Traceback (most recent call last): > File "", line 5, in > File "/root/dev/spark/dist/python/pyspark/sql/streaming.py", line 853, in > start > return self._sq(self._jwrite.start()) > File > "/root/dev/spark/dist/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", > line 1133, in __call__ > File "/root/dev/spark/dist/python/pyspark/sql/utils.py", line 69, in deco > raise AnalysisException(s.split(': ', 1)[1], stackTrace) > pyspark.sql.utils.AnalysisException: u'Append output mode not supported when > there are streaming aggregations on streaming DataFrames/DataSets without > watermark;;\nAggregate [window#17, word#5], [window#17 AS window#11, word#5, > count(1) AS count#16L]\n+- Filter ((t#6 >= window#17.start) && (t#6 < > window#17.end))\n +- Expand [ArrayBuffer(named_struct(start, > CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as > double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, > (CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as > double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0) + > 3000)), word#5, t#6-T3ms), ArrayBuffer(named_struct(start, > CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as > double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, > (CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as > double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0) + > 3000)), word#5, t#6-T3ms)], [window#17, word#5, t#6-T3ms]\n > +- EventTimeWatermark t#6: timestamp, interval 30 seconds\n +- > Project [cast(word#0 as string) AS word#5, cast(t#1 as timestamp) AS t#6]\n > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@c4079ca,csv,List(),Some(StructType(StructField(word,StringType,true), > StructField(t,IntegerType,true))),List(),None,Map(sep -> ;, path -> > /tmp/data),None), FileSource[/tmp/data], [word#0, t#1]\n' > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19926) Make pyspark exception more readable
Genmao Yu created SPARK-19926: - Summary: Make pyspark exception more readable Key: SPARK-19926 URL: https://issues.apache.org/jira/browse/SPARK-19926 Project: Spark Issue Type: Improvement Components: PySpark Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19853) Uppercase Kafka topics fail when startingOffsets are SpecificOffsets
[ https://issues.apache.org/jira/browse/SPARK-19853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901158#comment-15901158 ] Genmao Yu commented on SPARK-19853: --- Good catch! I will open a pr to fix this. Could you please help to review? > Uppercase Kafka topics fail when startingOffsets are SpecificOffsets > > > Key: SPARK-19853 > URL: https://issues.apache.org/jira/browse/SPARK-19853 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Chris Bowden >Priority: Trivial > > When using the KafkaSource with Structured Streaming, consumer assignments > are not what the user expects if startingOffsets is set to an explicit set of > topics/partitions in JSON where the topic(s) happen to have uppercase > characters. When StartingOffsets is constructed, the original string value > from options is transformed toLowerCase to make matching on "earliest" and > "latest" case insensitive. However, the toLowerCase json is passed to > SpecificOffsets for the terminal condition, so topic names may not be what > the user intended by the time assignments are made with the underlying > KafkaConsumer. > From KafkaSourceProvider: > {code} > val startingOffsets = > caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) > match { > case Some("latest") => LatestOffsets > case Some("earliest") => EarliestOffsets > case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) > case None => LatestOffsets > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19861) watermark should not be a negative time.
Genmao Yu created SPARK-19861: - Summary: watermark should not be a negative time. Key: SPARK-19861 URL: https://issues.apache.org/jira/browse/SPARK-19861 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19822) CheckpointSuite.testCheckpointedOperation: should not check checkpointFilesOfLatestTime by the PATH string.
Genmao Yu created SPARK-19822: - Summary: CheckpointSuite.testCheckpointedOperation: should not check checkpointFilesOfLatestTime by the PATH string. Key: SPARK-19822 URL: https://issues.apache.org/jira/browse/SPARK-19822 Project: Spark Issue Type: Bug Components: Tests Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19807) Add reason for cancellation when a stage is killed using web UI
[ https://issues.apache.org/jira/browse/SPARK-19807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894185#comment-15894185 ] Genmao Yu edited comment on SPARK-19807 at 3/3/17 11:35 AM: !https://cloud.githubusercontent.com/assets/7402327/23549702/6a0c93f6-0048-11e7-8a3f-bf58befb887b.png! Do you mean the "Job 0 cancelled" in picture? was (Author: unclegen): !https://cloud.githubusercontent.com/assets/7402327/23549478/70888646-0047-11e7-8e2c-e64a3db43711.png! Do you mean the "Job 0 cancelled" in picture? > Add reason for cancellation when a stage is killed using web UI > --- > > Key: SPARK-19807 > URL: https://issues.apache.org/jira/browse/SPARK-19807 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Priority: Trivial > > When a user kills a stage using web UI (in Stages page), > {{StagesTab.handleKillRequest}} requests {{SparkContext}} to cancel the stage > without giving a reason. {{SparkContext}} has {{cancelStage(stageId: Int, > reason: String)}} that Spark could use to pass the information for > monitoring/debugging purposes. > {code} > scala> sc.range(0, 5, 1, 1).mapPartitions { nums => { Thread.sleep(60 * > 1000); nums } }.count > {code} > Use http://localhost:4040/stages/ and click Kill. > {code} > org.apache.spark.SparkException: Job 0 cancelled because Stage 0 was cancelled > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1486) > at > org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1426) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1415) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) > at > org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1656) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1645) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2019) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2040) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2059) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) > at org.apache.spark.rdd.RDD.count(RDD.scala:1158) > ... 48 elided > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19807) Add reason for cancellation when a stage is killed using web UI
[ https://issues.apache.org/jira/browse/SPARK-19807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894185#comment-15894185 ] Genmao Yu commented on SPARK-19807: --- !https://cloud.githubusercontent.com/assets/7402327/23549478/70888646-0047-11e7-8e2c-e64a3db43711.png! Do you mean the "Job 0 cancelled" in picture? > Add reason for cancellation when a stage is killed using web UI > --- > > Key: SPARK-19807 > URL: https://issues.apache.org/jira/browse/SPARK-19807 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Priority: Trivial > > When a user kills a stage using web UI (in Stages page), > {{StagesTab.handleKillRequest}} requests {{SparkContext}} to cancel the stage > without giving a reason. {{SparkContext}} has {{cancelStage(stageId: Int, > reason: String)}} that Spark could use to pass the information for > monitoring/debugging purposes. > {code} > scala> sc.range(0, 5, 1, 1).mapPartitions { nums => { Thread.sleep(60 * > 1000); nums } }.count > {code} > Use http://localhost:4040/stages/ and click Kill. > {code} > org.apache.spark.SparkException: Job 0 cancelled because Stage 0 was cancelled > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1486) > at > org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1426) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1415) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) > at > org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1656) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1645) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2019) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2040) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2059) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) > at org.apache.spark.rdd.RDD.count(RDD.scala:1158) > ... 48 elided > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19805) Log the row type when query result dose not match
[ https://issues.apache.org/jira/browse/SPARK-19805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-19805: -- Summary: Log the row type when query result dose not match (was: Log the row type when query result dose match) > Log the row type when query result dose not match > - > > Key: SPARK-19805 > URL: https://issues.apache.org/jira/browse/SPARK-19805 > Project: Spark > Issue Type: Improvement > Components: Tests >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19805) Log the row type when query result dose match
Genmao Yu created SPARK-19805: - Summary: Log the row type when query result dose match Key: SPARK-19805 URL: https://issues.apache.org/jira/browse/SPARK-19805 Project: Spark Issue Type: Improvement Components: Tests Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-19349) Check resource ready to avoid multiple receivers to be scheduled on the same node.
[ https://issues.apache.org/jira/browse/SPARK-19349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu closed SPARK-19349. - Resolution: Won't Fix > Check resource ready to avoid multiple receivers to be scheduled on the same > node. > -- > > Key: SPARK-19349 > URL: https://issues.apache.org/jira/browse/SPARK-19349 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu > > Currently, we can only ensure registered resource satisfy the > "spark.scheduler.minRegisteredResourcesRatio". But if > "spark.scheduler.minRegisteredResourcesRatio" is set too small, receivers may > still be scheduled to few nodes. In fact, we can give once more chance to > wait for sufficient resource to schedule receiver evenly. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19800) Implement one kind of streaming sampling - reservoir sampling
Genmao Yu created SPARK-19800: - Summary: Implement one kind of streaming sampling - reservoir sampling Key: SPARK-19800 URL: https://issues.apache.org/jira/browse/SPARK-19800 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19738) Consider adding error handler to DataStreamWriter
[ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887218#comment-15887218 ] Genmao Yu commented on SPARK-19738: --- [~jlalwani] I tested it on latest master branch, and return NULL if meet bad data. > Consider adding error handler to DataStreamWriter > - > > Key: SPARK-19738 > URL: https://issues.apache.org/jira/browse/SPARK-19738 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 2.1.0 >Reporter: Jayesh lalwani > > For Structured streaming implementations, it is important that the > applications stay always On. However, right now, errors stop the driver. In > some cases, this is not desirable behavior. For example, I have the following > application > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").start() > {code} > I send the following input to it > {quote} > 1,Iron man > 2,SUperman > {quote} > Obviously, the data is bad. This causes the executor to throw an exception > that propogates to the driver, which promptly shuts down. The driver is > running in supervised mode, and it gets restarted. The application reads the > same bad input and shuts down again. This goes ad-infinitum. This behavior is > desirable, in cases, the error is recoverable. For example, if the executor > cannot talk to the database, we want the application to keep trying the same > input again and again till the database recovers. However, for some cases, > this behavior is undesirable. We do not want this to happen when the input is > bad. We want to put the bad record in some sort of dead letter queue. Or > maybe we want to kill the driver only when the number of errors have crossed > a certain threshold. Or maybe we want to email someone. > Proposal: > Add a error handler to the data stream. When the executor fails, it should > call the error handler and pass the Exception to the error handler. The error > handler could eat the exception, or transform it, or update counts in an > accumulator, etc > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19738) Consider adding error handler to DataStreamWriter
[ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885374#comment-15885374 ] Genmao Yu edited comment on SPARK-19738 at 2/27/17 9:44 AM: [~gaaldornick] Sorry I can not reproduce it. What spark version are you testint with? was (Author: unclegen): [~gaaldornick] Sorry I can not reproduce it. > Consider adding error handler to DataStreamWriter > - > > Key: SPARK-19738 > URL: https://issues.apache.org/jira/browse/SPARK-19738 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Jayesh lalwani > > For Structured streaming implementations, it is important that the > applications stay always On. However, right now, errors stop the driver. In > some cases, this is not desirable behavior. For example, I have the following > application > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").start() > {code} > I send the following input to it > {quote} > 1,Iron man > 2,SUperman > {quote} > Obviously, the data is bad. This causes the executor to throw an exception > that propogates to the driver, which promptly shuts down. The driver is > running in supervised mode, and it gets restarted. The application reads the > same bad input and shuts down again. This goes ad-infinitum. This behavior is > desirable, in cases, the error is recoverable. For example, if the executor > cannot talk to the database, we want the application to keep trying the same > input again and again till the database recovers. However, for some cases, > this behavior is undesirable. We do not want this to happen when the input is > bad. We want to put the bad record in some sort of dead letter queue. Or > maybe we want to kill the driver only when the number of errors have crossed > a certain threshold. Or maybe we want to email someone. > Proposal: > Add a error handler to the data stream. When the executor fails, it should > call the error handler and pass the Exception to the error handler. The error > handler could eat the exception, or transform it, or update counts in an > accumulator, etc > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19738) Consider adding error handler to DataStreamWriter
[ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885374#comment-15885374 ] Genmao Yu edited comment on SPARK-19738 at 2/27/17 9:45 AM: [~gaaldornick] Sorry I can not reproduce it. What spark version are you testing with? was (Author: unclegen): [~gaaldornick] Sorry I can not reproduce it. What spark version are you testint with? > Consider adding error handler to DataStreamWriter > - > > Key: SPARK-19738 > URL: https://issues.apache.org/jira/browse/SPARK-19738 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Jayesh lalwani > > For Structured streaming implementations, it is important that the > applications stay always On. However, right now, errors stop the driver. In > some cases, this is not desirable behavior. For example, I have the following > application > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").start() > {code} > I send the following input to it > {quote} > 1,Iron man > 2,SUperman > {quote} > Obviously, the data is bad. This causes the executor to throw an exception > that propogates to the driver, which promptly shuts down. The driver is > running in supervised mode, and it gets restarted. The application reads the > same bad input and shuts down again. This goes ad-infinitum. This behavior is > desirable, in cases, the error is recoverable. For example, if the executor > cannot talk to the database, we want the application to keep trying the same > input again and again till the database recovers. However, for some cases, > this behavior is undesirable. We do not want this to happen when the input is > bad. We want to put the bad record in some sort of dead letter queue. Or > maybe we want to kill the driver only when the number of errors have crossed > a certain threshold. Or maybe we want to email someone. > Proposal: > Add a error handler to the data stream. When the executor fails, it should > call the error handler and pass the Exception to the error handler. The error > handler could eat the exception, or transform it, or update counts in an > accumulator, etc > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19749) Name socket source with a meaningful name
Genmao Yu created SPARK-19749: - Summary: Name socket source with a meaningful name Key: SPARK-19749 URL: https://issues.apache.org/jira/browse/SPARK-19749 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19738) Consider adding error handler to DataStreamWriter
[ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885374#comment-15885374 ] Genmao Yu commented on SPARK-19738: --- [~gaaldornick] Sorry I can not reproduce it. > Consider adding error handler to DataStreamWriter > - > > Key: SPARK-19738 > URL: https://issues.apache.org/jira/browse/SPARK-19738 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Jayesh lalwani > > For Structured streaming implementations, it is important that the > applications stay always On. However, right now, errors stop the driver. In > some cases, this is not desirable behavior. For example, I have the following > application > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").start() > {code} > I send the following input to it > {quote} > 1,Iron man > 2,SUperman > {quote} > Obviously, the data is bad. This causes the executor to throw an exception > that propogates to the driver, which promptly shuts down. The driver is > running in supervised mode, and it gets restarted. The application reads the > same bad input and shuts down again. This goes ad-infinitum. This behavior is > desirable, in cases, the error is recoverable. For example, if the executor > cannot talk to the database, we want the application to keep trying the same > input again and again till the database recovers. However, for some cases, > this behavior is undesirable. We do not want this to happen when the input is > bad. We want to put the bad record in some sort of dead letter queue. Or > maybe we want to kill the driver only when the number of errors have crossed > a certain threshold. Or maybe we want to email someone. > Proposal: > Add a error handler to the data stream. When the executor fails, it should > call the error handler and pass the Exception to the error handler. The error > handler could eat the exception, or transform it, or update counts in an > accumulator, etc > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-19699) createOrReplaceTable does not always replace an existing table of the same name
[ https://issues.apache.org/jira/browse/SPARK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-19699: -- Comment: was deleted (was: Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan] What's your opinion? !https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png!) > createOrReplaceTable does not always replace an existing table of the same > name > --- > > Key: SPARK-19699 > URL: https://issues.apache.org/jira/browse/SPARK-19699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Barry Becker >Priority: Minor > > There are cases when dataframe.createOrReplaceTempView does not replace an > existing table with the same name. > Please also refer to my [related stack-overflow > post|http://stackoverflow.com/questions/42371690/in-spark-2-1-how-come-the-dataframe-createoreplacetemptable-does-not-replace-an]. > To reproduce, do > {code} > df.collect() > df.createOrReplaceTempView("foo1") > df.sqlContext.cacheTable("foo1") > {code} > with one dataframe, and then do exactly the same thing with a different > dataframe. Then look in the storage tab in the spark UI and see multiple > entries for "foo1" in the "RDD Name" column. > Maybe I am misunderstanding, but this causes 2 apparent problems > 1) How do you know which table will be retrieved with > sqlContext.table("foo1") ? > 2) The duplicate entries represent a memory leak. > I have tried calling dropTempTable(existingName) first, but then have > occasionally seen a FAILFAST error when trying to use the table. It's as if > the dropTempTable is not synchronous, but maybe I am doing something wrong. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19699) createOrReplaceTable does not always replace an existing table of the same name
[ https://issues.apache.org/jira/browse/SPARK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882385#comment-15882385 ] Genmao Yu commented on SPARK-19699: --- Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan] !https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png! > createOrReplaceTable does not always replace an existing table of the same > name > --- > > Key: SPARK-19699 > URL: https://issues.apache.org/jira/browse/SPARK-19699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Barry Becker >Priority: Minor > > There are cases when dataframe.createOrReplaceTempView does not replace an > existing table with the same name. > Please also refer to my [related stack-overflow > post|http://stackoverflow.com/questions/42371690/in-spark-2-1-how-come-the-dataframe-createoreplacetemptable-does-not-replace-an]. > To reproduce, do > {code} > df.collect() > df.createOrReplaceTempView("foo1") > df.sqlContext.cacheTable("foo1") > {code} > with one dataframe, and then do exactly the same thing with a different > dataframe. Then look in the storage tab in the spark UI and see multiple > entries for "foo1" in the "RDD Name" column. > Maybe I am misunderstanding, but this causes 2 apparent problems > 1) How do you know which table will be retrieved with > sqlContext.table("foo1") ? > 2) The duplicate entries represent a memory leak. > I have tried calling dropTempTable(existingName) first, but then have > occasionally seen a FAILFAST error when trying to use the table. It's as if > the dropTempTable is not synchronous, but maybe I am doing something wrong. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19699) createOrReplaceTable does not always replace an existing table of the same name
[ https://issues.apache.org/jira/browse/SPARK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882385#comment-15882385 ] Genmao Yu edited comment on SPARK-19699 at 2/24/17 10:16 AM: - Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan] What's your opinion? !https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png! was (Author: unclegen): Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan] !https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png! > createOrReplaceTable does not always replace an existing table of the same > name > --- > > Key: SPARK-19699 > URL: https://issues.apache.org/jira/browse/SPARK-19699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Barry Becker >Priority: Minor > > There are cases when dataframe.createOrReplaceTempView does not replace an > existing table with the same name. > Please also refer to my [related stack-overflow > post|http://stackoverflow.com/questions/42371690/in-spark-2-1-how-come-the-dataframe-createoreplacetemptable-does-not-replace-an]. > To reproduce, do > {code} > df.collect() > df.createOrReplaceTempView("foo1") > df.sqlContext.cacheTable("foo1") > {code} > with one dataframe, and then do exactly the same thing with a different > dataframe. Then look in the storage tab in the spark UI and see multiple > entries for "foo1" in the "RDD Name" column. > Maybe I am misunderstanding, but this causes 2 apparent problems > 1) How do you know which table will be retrieved with > sqlContext.table("foo1") ? > 2) The duplicate entries represent a memory leak. > I have tried calling dropTempTable(existingName) first, but then have > occasionally seen a FAILFAST error when trying to use the table. It's as if > the dropTempTable is not synchronous, but maybe I am doing something wrong. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-19642) Improve the security guarantee for rest api and ui
[ https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu closed SPARK-19642. - Resolution: Won't Fix > Improve the security guarantee for rest api and ui > -- > > Key: SPARK-19642 > URL: https://issues.apache.org/jira/browse/SPARK-19642 > Project: Spark > Issue Type: Task > Components: Spark Core >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu > > As Spark gets more and more features, data may start leaking through other > places (e.g. SQL query plans which are shown in the UI). Also current rest > api may be a security hole. Open this JIRA to research and address the > potential security flaws. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19676) Flaky test: FsHistoryProviderSuite.SPARK-3697: ignore directories that cannot be read.
Genmao Yu created SPARK-19676: - Summary: Flaky test: FsHistoryProviderSuite.SPARK-3697: ignore directories that cannot be read. Key: SPARK-19676 URL: https://issues.apache.org/jira/browse/SPARK-19676 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19642) Improve the security guarantee for rest api and ui
[ https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-19642: -- Summary: Improve the security guarantee for rest api and ui (was: Improve the security guarantee for rest api) > Improve the security guarantee for rest api and ui > -- > > Key: SPARK-19642 > URL: https://issues.apache.org/jira/browse/SPARK-19642 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Priority: Critical > > As Spark gets more and more features, data may start leaking through other > places (e.g. SQL query plans which are shown in the UI). Also current rest > api may be a security hole. Open this JIRA to research and address the > potential security flaws. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19642) Improve the security guarantee for rest api
[ https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-19642: -- Description: As Spark gets more and more features, data may start leaking through other places (e.g. SQL query plans which are shown in the UI). Also current rest api may be a security hole. Open this JIRA to research and address the potential security flaws. (was: As Spark gets more and more features, data may start leaking through other places (e.g. SQL query plans which are shown in the UI). Also current rest api may be a security hole. Opening this JIRA to research and address the potential security flaws.) > Improve the security guarantee for rest api > --- > > Key: SPARK-19642 > URL: https://issues.apache.org/jira/browse/SPARK-19642 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Priority: Critical > > As Spark gets more and more features, data may start leaking through other > places (e.g. SQL query plans which are shown in the UI). Also current rest > api may be a security hole. Open this JIRA to research and address the > potential security flaws. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19642) Improve the security guarantee for rest api
[ https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-19642: -- Description: As Spark gets more and more features, data may start leaking through other places (e.g. SQL query plans which are shown in the UI). Also current rest api may be a security hole. Opening this JIRA to research and address the potential security flaws. (was: As Spark gets more and more features, data may start leaking through other places (e.g. SQL query plans which are shown in the UI). And current rest api may be a security hole. Opening this JIRA to research and address the potential security flaws.) > Improve the security guarantee for rest api > --- > > Key: SPARK-19642 > URL: https://issues.apache.org/jira/browse/SPARK-19642 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Priority: Critical > > As Spark gets more and more features, data may start leaking through other > places (e.g. SQL query plans which are shown in the UI). Also current rest > api may be a security hole. Opening this JIRA to research and address the > potential security flaws. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19642) Improve the security guarantee for rest api
[ https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871058#comment-15871058 ] Genmao Yu commented on SPARK-19642: --- cc [~ajbozarth], [~vanzin] and [~srowen] > Improve the security guarantee for rest api > --- > > Key: SPARK-19642 > URL: https://issues.apache.org/jira/browse/SPARK-19642 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Priority: Critical > > As Spark gets more and more features, data may start leaking through other > places (e.g. SQL query plans which are shown in the UI). And current rest api > may be a security hole. Opening this JIRA to research and address the > potential security flaws. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19642) Improve the security guarantee for rest api
Genmao Yu created SPARK-19642: - Summary: Improve the security guarantee for rest api Key: SPARK-19642 URL: https://issues.apache.org/jira/browse/SPARK-19642 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Critical As Spark gets more and more features, data may start leaking through other places (e.g. SQL query plans which are shown in the UI). And current rest api may be a security hole. Opening this JIRA to research and address the potential security flaws. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19605) Fail it if existing resource is not enough to run streaming job
Genmao Yu created SPARK-19605: - Summary: Fail it if existing resource is not enough to run streaming job Key: SPARK-19605 URL: https://issues.apache.org/jira/browse/SPARK-19605 Project: Spark Issue Type: Improvement Components: DStreams Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19556) Broadcast data is not encrypted when I/O encryption is on
[ https://issues.apache.org/jira/browse/SPARK-19556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867152#comment-15867152 ] Genmao Yu commented on SPARK-19556: --- [~vanzin] I am working on this, could you please assign it to me? > Broadcast data is not encrypted when I/O encryption is on > - > > Key: SPARK-19556 > URL: https://issues.apache.org/jira/browse/SPARK-19556 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Marcelo Vanzin > > {{TorrentBroadcast}} uses a couple of "back doors" into the block manager to > write and read data: > {code} > if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, > tellMaster = true)) { > throw new SparkException(s"Failed to store $pieceId of $broadcastId > in local BlockManager") > } > {code} > {code} > bm.getLocalBytes(pieceId) match { > case Some(block) => > blocks(pid) = block > releaseLock(pieceId) > case None => > bm.getRemoteBytes(pieceId) match { > case Some(b) => > if (checksumEnabled) { > val sum = calcChecksum(b.chunks(0)) > if (sum != checksums(pid)) { > throw new SparkException(s"corrupt remote block $pieceId of > $broadcastId:" + > s" $sum != ${checksums(pid)}") > } > } > // We found the block from remote executors/driver's > BlockManager, so put the block > // in this executor's BlockManager. > if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, > tellMaster = true)) { > throw new SparkException( > s"Failed to store $pieceId of $broadcastId in local > BlockManager") > } > blocks(pid) = b > case None => > throw new SparkException(s"Failed to get $pieceId of > $broadcastId") > } > } > {code} > The thing these block manager methods have in common is that they bypass the > encryption code; so broadcast data is stored unencrypted in the block > manager, causing unencrypted data to be written to disk if those blocks need > to be evicted from memory. > The correct fix here is actually not to change {{TorrentBroadcast}}, but to > fix the block manager so that: > - data stored in memory is not encrypted > - data written to disk is encrypted > This would simplify the code paths that use BlockManager / SerializerManager > APIs (e.g. see SPARK-19520), but requires some tricky changes inside the > BlockManager to still be able to use file channels to avoid reading whole > blocks back into memory so they can be decrypted. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19524) newFilesOnly does not work according to docs.
[ https://issues.apache.org/jira/browse/SPARK-19524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15858819#comment-15858819 ] Genmao Yu commented on SPARK-19524: --- Current implementation will clear the old time-to-files mappings based on the {{minRememberDurationS}}. > newFilesOnly does not work according to docs. > -- > > Key: SPARK-19524 > URL: https://issues.apache.org/jira/browse/SPARK-19524 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 >Reporter: Egor Pahomov > > Docs says: > newFilesOnly > Should process only new files and ignore existing files in the directory > It's not working. > http://stackoverflow.com/questions/29852249/how-spark-streaming-identifies-new-files > says, that it shouldn't work as expected. > https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala > not clear at all in terms, what code tries to do -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19482) Fail it if 'spark.master' is set with different value
Genmao Yu created SPARK-19482: - Summary: Fail it if 'spark.master' is set with different value Key: SPARK-19482 URL: https://issues.apache.org/jira/browse/SPARK-19482 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.1.0, 2.0.2 Reporter: Genmao Yu First, there is no need to set 'spark.master' multi-times with different values. Second, It is possible for users to set the different 'spark.master' in code with `spark-submit` command, and will confuse users. So, we should do once check if the 'spark.master' already exists in settings and if the previous value is the same with current value. Throw a IllegalArgumentException when previous value is different with current value. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19451) Long values in Window function
[ https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853773#comment-15853773 ] Genmao Yu commented on SPARK-19451: --- [~jchamp] I have taken a fast look through the code, and did not find any strong point to set the type of index as Int. In the window function, there is really a underlying integer overflow issue. I have make a pull request (https://github.com/apache/spark/pull/16818), and any suggestion is appreciated. > Long values in Window function > -- > > Key: SPARK-19451 > URL: https://issues.apache.org/jira/browse/SPARK-19451 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.1, 2.0.2 >Reporter: Julien Champ > > Hi there, > there seems to be a major limitation in spark window functions and > rangeBetween method. > If I have the following code : > {code:title=Exemple |borderStyle=solid} > val tw = Window.orderBy("date") > .partitionBy("id") > .rangeBetween( from , 0) > {code} > Everything seems ok, while *from* value is not too large... Even if the > rangeBetween() method supports Long parameters. > But If i set *-216000L* value to *from* it does not work ! > It is probably related to this part of code in the between() method, of the > WindowSpec class, called by rangeBetween() > {code:title=between() method|borderStyle=solid} > val boundaryStart = start match { > case 0 => CurrentRow > case Long.MinValue => UnboundedPreceding > case x if x < 0 => ValuePreceding(-start.toInt) > case x if x > 0 => ValueFollowing(start.toInt) > } > {code} > ( look at this *.toInt* ) > Does anybody know it there's a way to solve / patch this behavior ? > Any help will be appreciated > Thx -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19451) Long values in Window function
[ https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853773#comment-15853773 ] Genmao Yu edited comment on SPARK-19451 at 2/6/17 9:58 AM: --- [~jchamp] I have taken a fast look through the code, and did not find any strong point to set the type of index as Int. In the window function, there is really a underlying integer overflow issue. I made a pull request (https://github.com/apache/spark/pull/16818), and any suggestion is appreciated. was (Author: unclegen): [~jchamp] I have taken a fast look through the code, and did not find any strong point to set the type of index as Int. In the window function, there is really a underlying integer overflow issue. I have make a pull request (https://github.com/apache/spark/pull/16818), and any suggestion is appreciated. > Long values in Window function > -- > > Key: SPARK-19451 > URL: https://issues.apache.org/jira/browse/SPARK-19451 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.1, 2.0.2 >Reporter: Julien Champ > > Hi there, > there seems to be a major limitation in spark window functions and > rangeBetween method. > If I have the following code : > {code:title=Exemple |borderStyle=solid} > val tw = Window.orderBy("date") > .partitionBy("id") > .rangeBetween( from , 0) > {code} > Everything seems ok, while *from* value is not too large... Even if the > rangeBetween() method supports Long parameters. > But If i set *-216000L* value to *from* it does not work ! > It is probably related to this part of code in the between() method, of the > WindowSpec class, called by rangeBetween() > {code:title=between() method|borderStyle=solid} > val boundaryStart = start match { > case 0 => CurrentRow > case Long.MinValue => UnboundedPreceding > case x if x < 0 => ValuePreceding(-start.toInt) > case x if x > 0 => ValueFollowing(start.toInt) > } > {code} > ( look at this *.toInt* ) > Does anybody know it there's a way to solve / patch this behavior ? > Any help will be appreciated > Thx -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19451) Long values in Window function
[ https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853434#comment-15853434 ] Genmao Yu commented on SPARK-19451: --- Good catch! I will dig deeply into code and fix it if it is really a bug. > Long values in Window function > -- > > Key: SPARK-19451 > URL: https://issues.apache.org/jira/browse/SPARK-19451 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.1, 2.0.2 >Reporter: Julien Champ > > Hi there, > there seems to be a major limitation in spark window functions and > rangeBetween method. > If I have the following code : > {code:title=Exemple |borderStyle=solid} > val tw = Window.orderBy("date") > .partitionBy("id") > .rangeBetween( from , 0) > {code} > Everything seems ok, while *from* value is not too large... Even if the > rangeBetween() method supports Long parameters. > But If i set *-216000L* value to *from* it does not work ! > It is probably related to this part of code in the between() method, of the > WindowSpec class, called by rangeBetween() > {code:title=between() method|borderStyle=solid} > val boundaryStart = start match { > case 0 => CurrentRow > case Long.MinValue => UnboundedPreceding > case x if x < 0 => ValuePreceding(-start.toInt) > case x if x > 0 => ValueFollowing(start.toInt) > } > {code} > ( look at this *.toInt* ) > Does anybody know it there's a way to solve / patch this behavior ? > Any help will be appreciated > Thx -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19407) defaultFS is used FileSystem.get instead of getting it from uri scheme
[ https://issues.apache.org/jira/browse/SPARK-19407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853426#comment-15853426 ] Genmao Yu commented on SPARK-19407: --- [~aassudani] Are you still working on this? As this issue is clear and easy to fix, I will making a pr later if it is busy for you to work on this. > defaultFS is used FileSystem.get instead of getting it from uri scheme > -- > > Key: SPARK-19407 > URL: https://issues.apache.org/jira/browse/SPARK-19407 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Amit Assudani > Labels: checkpoint, filesystem, starter, streaming > > Caused by: java.lang.IllegalArgumentException: Wrong FS: > s3a://**/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, > expected: file:/// > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649) > at > org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82) > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421) > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:100) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > Can easily replicate on spark standalone cluster by providing checkpoint > location uri scheme anything other than "file://" and not overriding in > config. > WorkAround --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in > sparkConf or spark-default.conf -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19147) netty throw NPE
[ https://issues.apache.org/jira/browse/SPARK-19147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837333#comment-15837333 ] Genmao Yu commented on SPARK-19147: --- After dig into code, this issue may occurs when executor is existing and at the same time some tasks are running. There are some scenes in which this NPE will be thrown: 1. AM re-registers after a failure, and this will resets the state of CoarseGrainedSchedulerBackend to the initial state, then old executors will be removed. This will only occurs in yarn-client mode. 2. Master may will remove executor when a work is lost, and at the same time some tasks may be running. 3. Some other scenes i do not know. As for as I know, this NPE harms nothing, but may confuse users. Maybe, we should improve this exception message to tell users it is safe. What is your opinion? [~zsxwing] > netty throw NPE > --- > > Key: SPARK-19147 > URL: https://issues.apache.org/jira/browse/SPARK-19147 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: cen yuhai > > {code} > 17/01/10 19:17:20 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) > from bigdata-hdp-apache1828.xg01.diditaxi.com:7337 > java.lang.NullPointerException: group > at io.netty.bootstrap.AbstractBootstrap.group(AbstractBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:203) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:354) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396) > at > org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:138) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at >
[jira] [Comment Edited] (SPARK-19147) netty throw NPE
[ https://issues.apache.org/jira/browse/SPARK-19147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837333#comment-15837333 ] Genmao Yu edited comment on SPARK-19147 at 1/25/17 7:39 AM: After dig into code, this issue may occurs when executor is existing and at the same time some tasks are running. There are some scenes in which this NPE will be thrown: 1. AM re-registers after a failure, and this will resets the state of CoarseGrainedSchedulerBackend to the initial state, then old executors will be removed. This will only occurs in yarn-client mode. 2. Master may will remove executor when a work is lost, and at the same time some tasks may be running. 3. Some other scenes i do not know. As far as I know, this NPE harms nothing, but may confuse users. Maybe, we should improve this exception message to tell users it is safe. What is your opinion? [~zsxwing] was (Author: unclegen): After dig into code, this issue may occurs when executor is existing and at the same time some tasks are running. There are some scenes in which this NPE will be thrown: 1. AM re-registers after a failure, and this will resets the state of CoarseGrainedSchedulerBackend to the initial state, then old executors will be removed. This will only occurs in yarn-client mode. 2. Master may will remove executor when a work is lost, and at the same time some tasks may be running. 3. Some other scenes i do not know. As for as I know, this NPE harms nothing, but may confuse users. Maybe, we should improve this exception message to tell users it is safe. What is your opinion? [~zsxwing] > netty throw NPE > --- > > Key: SPARK-19147 > URL: https://issues.apache.org/jira/browse/SPARK-19147 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: cen yuhai > > {code} > 17/01/10 19:17:20 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) > from bigdata-hdp-apache1828.xg01.diditaxi.com:7337 > java.lang.NullPointerException: group > at io.netty.bootstrap.AbstractBootstrap.group(AbstractBootstrap.java:80) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:203) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:181) > at > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:354) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396) > at > org.apache.spark.sql.execution.columnar.InMemoryRelation$$
[jira] [Commented] (SPARK-19354) Killed tasks are getting marked as FAILED
[ https://issues.apache.org/jira/browse/SPARK-19354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837048#comment-15837048 ] Genmao Yu commented on SPARK-19354: --- IMHO, the killed tasks will be failed finally, so there is no strong point to separate Killed and Failed. > Killed tasks are getting marked as FAILED > - > > Key: SPARK-19354 > URL: https://issues.apache.org/jira/browse/SPARK-19354 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Reporter: Devaraj K >Priority: Minor > > When we enable speculation, we can see there are multiple attempts running > for the same task when the first task progress is slow. If any of the task > attempt succeeds then the other attempts will be killed, during killing the > attempts those attempts are getting marked as failed due to the below error. > We need to handle this error and mark the attempt as KILLED instead of FAILED. > ||93 ||214 ||1 (speculative) ||FAILED||ANY ||1 / > xx.xx.xx.x2 > stdout > stderr > ||2017/01/24 10:30:44 ||0.2 s ||0.0 B / 0 ||8.0 KB / 400 > ||java.io.IOException: Failed on local exception: > java.nio.channels.ClosedByInterruptException; Host Details : local host is: > "node2/xx.xx.xx.x2"; destination host is: "node1":9000; > +details|| > {code:xml} > 17/01/23 23:54:32 INFO Executor: Executor is trying to kill task 93.1 in > stage 1.0 (TID 214) > 17/01/23 23:54:32 INFO FileOutputCommitter: File Output Committer Algorithm > version is 1 > 17/01/23 23:54:32 ERROR Executor: Exception in task 93.1 in stage 1.0 (TID > 214) > java.io.IOException: Failed on local exception: > java.nio.channels.ClosedByInterruptException; Host Details : local host is: > "stobdtserver3/10.224.54.70"; destination host is: "stobdtserver2":9000; > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy17.create(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.create(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448) > at > org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804) > at > org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) > at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1133) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88) > at org.apache.spark.scheduler.Task.run(Task.scala:114) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.nio.channels.ClosedByInterruptException > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibl
[jira] [Commented] (SPARK-10141) Number of tasks on executors still become negative after failures
[ https://issues.apache.org/jira/browse/SPARK-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837043#comment-15837043 ] Genmao Yu commented on SPARK-10141: --- I think this is fix in https://github.com/apache/spark/pull/14969, and the fix version is Spark 2.0.0 > Number of tasks on executors still become negative after failures > - > > Key: SPARK-10141 > URL: https://issues.apache.org/jira/browse/SPARK-10141 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 1.5.0 >Reporter: Joseph K. Bradley >Priority: Minor > Attachments: Screen Shot 2015-08-20 at 3.14.49 PM.png > > > I hit this failure when running LDA on EC2 (after I made the model size > really big). > I was using the LDAExample.scala code on an EC2 cluster with 16 workers > (r3.2xlarge), on a Wikipedia dataset: > {code} > Training set size (documents) 4534059 > Vocabulary size (terms) 1 > Training set size (tokens)895575317 > EM optimizer > 1K topics > {code} > Failure message: > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 55 in > stage 22.0 failed 4 times, most recent failure: Lost task 55.3 in stage 22.0 > (TID 2881, 10.0.202.128): java.io.IOException: Failed to connect to > /10.0.202.128:54740 > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) > at > org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.net.ConnectException: Connection refused: /10.0.202.128:54740 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > ... 1 more > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1267) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1255) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1254) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1254) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:684) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1480) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1442) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1431) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.
[jira] [Commented] (SPARK-19356) Number of active tasks is negative even when there is no failed executor
[ https://issues.apache.org/jira/browse/SPARK-19356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837041#comment-15837041 ] Genmao Yu commented on SPARK-19356: --- I think this is fix in https://github.com/apache/spark/pull/14969, and the fix version is Spark 2.0.0 > Number of active tasks is negative even when there is no failed executor > > > Key: SPARK-19356 > URL: https://issues.apache.org/jira/browse/SPARK-19356 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 1.6.0 >Reporter: Lan Jiang >Priority: Minor > Attachments: Screen Shot 2017-01-24 at 4.39.09 PM.png > > > The active tasks number in the UI is negative, even when there is no failed > executor. It is different from > https://issues.apache.org/jira/browse/SPARK-10141, which relates to failed > executor -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-18563) mapWithState: initialState should have a timeout setting per record
[ https://issues.apache.org/jira/browse/SPARK-18563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-18563: -- Comment: was deleted (was: I do not know is there any plan to add new feature to DStreams? Maybe, we should focus on Structured Streaming? [~zsxwing]) > mapWithState: initialState should have a timeout setting per record > --- > > Key: SPARK-18563 > URL: https://issues.apache.org/jira/browse/SPARK-18563 > Project: Spark > Issue Type: Improvement > Components: DStreams >Reporter: Daniel Haviv > > when passing an initialState for mapWithState there should a possibility to > set a timeout at the record level. > If for example mapWithState is configured with a 48H timeout, loading an > initialState will cause the state to bloat and hold 96H of data and then > release 48H of data at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-19343) Do once optimistic checkpoint before stop
[ https://issues.apache.org/jira/browse/SPARK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu closed SPARK-19343. - Resolution: Won't Fix > Do once optimistic checkpoint before stop > - > > Key: SPARK-19343 > URL: https://issues.apache.org/jira/browse/SPARK-19343 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu > > Streaming job restarts from checkpoint, and it will rebuild several batch > until finding latest checkpointed RDD. So we can do once optimistic > checkpoint just before stop, so that reducing unnecessary recomputation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18563) mapWithState: initialState should have a timeout setting per record
[ https://issues.apache.org/jira/browse/SPARK-18563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833416#comment-15833416 ] Genmao Yu edited comment on SPARK-18563 at 1/22/17 9:35 AM: I do not know is there any plan to add new feature to DStreams? Maybe, we should focus on Structured Streaming? [~zsxwing] was (Author: unclegen): I do not know is there any plan to add new feature to DStreams? Maybe, we should focus on Structured Streaming? > mapWithState: initialState should have a timeout setting per record > --- > > Key: SPARK-18563 > URL: https://issues.apache.org/jira/browse/SPARK-18563 > Project: Spark > Issue Type: Improvement > Components: DStreams >Reporter: Daniel Haviv > > when passing an initialState for mapWithState there should a possibility to > set a timeout at the record level. > If for example mapWithState is configured with a 48H timeout, loading an > initialState will cause the state to bloat and hold 96H of data and then > release 48H of data at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18563) mapWithState: initialState should have a timeout setting per record
[ https://issues.apache.org/jira/browse/SPARK-18563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833416#comment-15833416 ] Genmao Yu commented on SPARK-18563: --- I do not know is there any plan to add new feature to DStreams? Maybe, we should focus on Structured Streaming? > mapWithState: initialState should have a timeout setting per record > --- > > Key: SPARK-18563 > URL: https://issues.apache.org/jira/browse/SPARK-18563 > Project: Spark > Issue Type: Improvement > Components: DStreams >Reporter: Daniel Haviv > > when passing an initialState for mapWithState there should a possibility to > set a timeout at the record level. > If for example mapWithState is configured with a 48H timeout, loading an > initialState will cause the state to bloat and hold 96H of data and then > release 48H of data at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18839) Executor is active on web, but actually is dead
[ https://issues.apache.org/jira/browse/SPARK-18839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833409#comment-15833409 ] Genmao Yu commented on SPARK-18839: --- Sorry, I do not think this is a bug. > Executor is active on web, but actually is dead > --- > > Key: SPARK-18839 > URL: https://issues.apache.org/jira/browse/SPARK-18839 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: meiyoula >Priority: Minor > > When a container is preempted, AM find it is completed, driver removes the > blockmanager. But executor actually dead after a few seconds, during this > period, it updates blocks, and re-register the blockmanager. so the exeutors > page show the executor is active. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18805) InternalMapWithStateDStream make java.lang.StackOverflowError
[ https://issues.apache.org/jira/browse/SPARK-18805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833377#comment-15833377 ] Genmao Yu commented on SPARK-18805: --- + 1 to {{That should be not an infinite loop. The time is different on each call.}} And, there is really a potential {{StackOverflowError}} issue, but it is hard to arise according to my understanding. Could you please provide your setting of checkpointDuration and batchDuration? > InternalMapWithStateDStream make java.lang.StackOverflowError > -- > > Key: SPARK-18805 > URL: https://issues.apache.org/jira/browse/SPARK-18805 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.6.3, 2.0.2 > Environment: mesos >Reporter: etienne > > When load InternalMapWithStateDStream from a check point. > If isValidTime is true and if there is no generatedRDD at the given time > there is an infinite loop. > 1) compute is call on InternalMapWithStateDStream > 2) InternalMapWithStateDStream try to generate the previousRDD > 3) Stream look in generatedRDD if the RDD is already generated for the given > time > 4) It not fund the rdd so it check if the time is valid. > 5) if the time is valid call compute on InternalMapWithStateDStream > 6) restart from 1) > Here the exception that illustrate this error > {code} > Exception in thread "streaming-start" java.lang.StackOverflowError > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) > at > org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18116) spark streaming ui show 0 events when recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-18116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Genmao Yu updated SPARK-18116: -- Target Version/s: (was: 2.0.3, 2.1.1) Fix Version/s: (was: 2.1.1) (was: 2.0.3) > spark streaming ui show 0 events when recovering from checkpoint > > > Key: SPARK-18116 > URL: https://issues.apache.org/jira/browse/SPARK-18116 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.6.1 > Environment: jdk1.8.0_77 Red Hat 4.4.7-11 >Reporter: liujianhui >Priority: Minor > Original Estimate: 1h > Remaining Estimate: 1h > > run a streaming application which souce from kafka.There are many batchs > queued in the job list before application stopped, and then stop the > application, as follow starting it from checkpointed file, in the spark ui, > the size of the queued batchs which stored in the checkpoint file are 0 > Batch TimeInput Size Scheduling Delay (?)Processing Time (?) > Total Delay (?) Output Ops: Succeeded/Total > 2016/10/26 19:02:15 0 events1.1 h 13 s1.1 h -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org