[jira] [Resolved] (SPARK-33596) NPE when there is no watermark metrics

2020-11-30 Thread Genmao Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu resolved SPARK-33596.
---
Resolution: Won't Fix

> NPE when there is no watermark metrics
> --
>
> Key: SPARK-33596
> URL: https://issues.apache.org/jira/browse/SPARK-33596
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: Genmao Yu
>Priority: Major
>
> We parse the process timestamp at 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L153,
>  but will throw NPE when there is no event time metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33596) NPE when there is no watermark metrics

2020-11-30 Thread Genmao Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-33596:
--
Summary: NPE when there is no watermark metrics  (was: NPE when there is no 
EventTime)

> NPE when there is no watermark metrics
> --
>
> Key: SPARK-33596
> URL: https://issues.apache.org/jira/browse/SPARK-33596
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: Genmao Yu
>Priority: Major
>
> We parse the process timestamp at 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L153,
>  but will throw NPE when there is no event time metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33596) NPE when there is no EventTime

2020-11-29 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-33596:
-

 Summary: NPE when there is no EventTime
 Key: SPARK-33596
 URL: https://issues.apache.org/jira/browse/SPARK-33596
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.1.0
Reporter: Genmao Yu


We parse the process timestamp at 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L153,
 but will throw NPE when there is no event time metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-31928) Flaky test: StreamingDeduplicationSuite.test no-data flag

2020-06-10 Thread Genmao Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130471#comment-17130471
 ] 

Genmao Yu commented on SPARK-31928:
---

There is a related pr: [https://github.com/apache/spark/pull/28391]

> Flaky test: StreamingDeduplicationSuite.test no-data flag
> -
>
> Key: SPARK-31928
> URL: https://issues.apache.org/jira/browse/SPARK-31928
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming, Tests
>Affects Versions: 3.1.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> Test failed: 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123621/
> {code:java}
> [info]   with spark.sql.streaming.noDataMicroBatches.enabled = false 
> [info]   Assert on query failed: : 
> [info]   Assert on query failed: 
> [info]   
> [info]   == Progress ==
> [info]  
> StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@372edb19,Map(spark.sql.streaming.noDataMicroBatches.enabled
>  -> false),null)
> [info]  AddData to MemoryStream[value#437541]: 10,11,12,13,14,15
> [info]  CheckAnswer: [10],[11],[12],[13],[14],[15]
> [info]  AssertOnQuery(, Check total state rows = List(6), 
> updated state rows = List(6))
> [info]  AddData to MemoryStream[value#437541]: 25
> [info]  CheckNewAnswer: [25]
> [info]  AssertOnQuery(, Check total state rows = List(7), 
> updated state rows = List(1))
> [info]   => AssertOnQuery(, )
> [info]   
> [info]   == Stream ==
> [info]   Output Mode: Append
> [info]   Stream state: {MemoryStream[value#437541]: 1}
> [info]   Thread state: alive
> [info]   Thread stack trace: java.lang.Thread.sleep(Native Method)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:241)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$1375/882607691.apply$mcZ$sp(Unknown
>  Source)
> [info]   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
> [info]   
> [info]   
> [info]   == Sink ==
> [info]   0: [11] [14] [13] [10] [15] [12]
> [info]   1: [25]
> [info]   
> [info]   
> [info]   == Plan ==
> [info]   == Parsed Logical Plan ==
> [info]   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@158ccd13
> [info]   +- Project [cast(eventTime#437544-T1ms as bigint) AS 
> eventTime#437548L]
> [info]  +- Deduplicate [value#437541, eventTime#437544-T1ms]
> [info] +- EventTimeWatermark eventTime#437544: timestamp, 10 seconds
> [info]+- Project [value#437541, cast(value#437541 as timestamp) 
> AS eventTime#437544]
> [info]   +- StreamingDataSourceV2Relation [value#437541], 
> org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@1802eea6, 
> MemoryStream[value#437541], 0, 1
> [info]   
> [info]   == Analyzed Logical Plan ==
> [info]   
> [info]   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@158ccd13
> [info]   +- Project [cast(eventTime#437544-T1ms as bigint) AS 
> eventTime#437548L]
> [info]  +- Deduplicate [value#437541, eventTime#437544-T1ms]
> [info] +- EventTimeWatermark eventTime#437544: timestamp, 10 seconds
> [info]+- Project [value#437541, cast(value#437541 as timestamp) 
> AS eventTime#437544]
> [info]   +- StreamingDataSourceV2Relation [value#437541], 
> org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@1802eea6, 
> MemoryStream[value#437541], 0, 1
> [info]   
> [info]   == Optimized Logical Plan ==
> [info]   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@158ccd13
> [info]   +- Project [cast(eventTime#437544-T1ms as bigint) AS 
> eventTime#437548L]
> [info]  +- Deduplicate [value#437541, eventTime#437544-T1ms]
> [info] +- EventTimeWatermark eventTime#437544: timestamp, 10 seconds
> [info]+- Project [value#437541, cast(value#437541 as timestamp) 
> AS eventTime#437544]
> [info]   +- StreamingDataSourceV2Relation [value#437541], 
> org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@1802eea6, 
> MemoryStream[value#437541], 0, 1
> [info]   
> [info]   == Physical Plan ==
> [info]   WriteToDataSourceV2 
> org.apache.spark.sql.execution.streaming.sources

[jira] [Created] (SPARK-31953) Add Spark Structured Streaming History Server Support

2020-06-10 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-31953:
-

 Summary: Add Spark Structured Streaming History Server Support
 Key: SPARK-31953
 URL: https://issues.apache.org/jira/browse/SPARK-31953
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.4.6, 3.0.0
Reporter: Genmao Yu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31913) StackOverflowError in FileScanRDD

2020-06-05 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-31913:
-

 Summary: StackOverflowError in FileScanRDD
 Key: SPARK-31913
 URL: https://issues.apache.org/jira/browse/SPARK-31913
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.5, 3.0.0
Reporter: Genmao Yu


Reading from FileScanRDD may failed with a StackOverflowError in my environment:
- There are a mass of empty files in table partition。
- Set `spark.sql.files.maxPartitionBytes`  with a large value: 1024MB

A quick workaround is set `spark.sql.files.maxPartitionBytes` with a small 
value, like default 128MB.

A better way is resolve the recursive calls in FileScanRDD.

{code}
java.lang.StackOverflowError
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.getSubject(Subject.java:297)
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:648)
at 
org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:2828)
at 
org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:2818)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2684)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at 
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at 
org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:640)
at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:148)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:143)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:326)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31677) Use KVStore to cache stream query progress

2020-05-11 Thread Genmao Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-31677:
--
Environment: (was: 1. Streaming query progress information are cached 
twice in *StreamExecution* and *StreamingQueryStatusListener*.  It is 
memory-wasting. We can make this two usage unified.
2. Use *KVStore* instead to cache streaming query progress information.)

> Use KVStore to cache stream query progress
> --
>
> Key: SPARK-31677
> URL: https://issues.apache.org/jira/browse/SPARK-31677
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.5, 3.0.0
>Reporter: Genmao Yu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31677) Use KVStore to cache stream query progress

2020-05-11 Thread Genmao Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-31677:
--
Description: 
1. Streaming query progress information are cached twice in *StreamExecution* 
and *StreamingQueryStatusListener*.  It is memory-wasting. We can make this two 
usage unified.
2. Use *KVStore* instead to cache streaming query progress information.

> Use KVStore to cache stream query progress
> --
>
> Key: SPARK-31677
> URL: https://issues.apache.org/jira/browse/SPARK-31677
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.5, 3.0.0
>Reporter: Genmao Yu
>Priority: Major
>
> 1. Streaming query progress information are cached twice in *StreamExecution* 
> and *StreamingQueryStatusListener*.  It is memory-wasting. We can make this 
> two usage unified.
> 2. Use *KVStore* instead to cache streaming query progress information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31677) Use KVStore to cache stream query progress

2020-05-11 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-31677:
-

 Summary: Use KVStore to cache stream query progress
 Key: SPARK-31677
 URL: https://issues.apache.org/jira/browse/SPARK-31677
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.4.5, 3.0.0
 Environment: 1. Streaming query progress information are cached twice 
in *StreamExecution* and *StreamingQueryStatusListener*.  It is memory-wasting. 
We can make this two usage unified.
2. Use *KVStore* instead to cache streaming query progress information.
Reporter: Genmao Yu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31593) Remove unnecessary streaming query progress update

2020-04-28 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-31593:
-

 Summary: Remove unnecessary streaming query progress update
 Key: SPARK-31593
 URL: https://issues.apache.org/jira/browse/SPARK-31593
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.5, 3.0.0
Reporter: Genmao Yu


Structured Streaming progress reporter will always report an `empty` progress 
when there is no new data. As design, we should provide progress updates every 
10s (default) when there is no new data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29973) Use nano time to calculate 'processedRowsPerSecond' to avoid 'NaN'/'Infinity'

2019-11-20 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-29973:
-

 Summary: Use nano time to calculate 'processedRowsPerSecond' to 
avoid 'NaN'/'Infinity'
 Key: SPARK-29973
 URL: https://issues.apache.org/jira/browse/SPARK-29973
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Genmao Yu


The {{"processingTimeSec"}} of batch may be less than 1 millis.  As 
{{"processingTimeSec"}} is calculated in millis, so {{"processingTimeSec"}} 
equals 0L. If there is no data in this batch, the {{"processedRowsPerSecond"}} 
equals {{"0/0.0d"}}, i.e. {{"Double.NaN"}}. If there are some data in this 
batch, the {{"processedRowsPerSecond"}} equals {{"N/0.0d"}}, i.e. 
{{"Double.Infinity"}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29683) Job failed due to executor failures all available nodes are blacklisted

2019-10-31 Thread Genmao Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-29683:
--
Description: 
My streaming job will fail *due to executor failures all available nodes are 
blacklisted*. This exception is thrown only when all node is blacklisted:
{code:java}
def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= 
numClusterNodes

val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ 
allocatorBlacklist.keySet
{code}
After diving into the code, I found some critical conditions not be handled 
properly:
 - unchecked `excludeNodes`: it comes from user config. If not set properly, it 
may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For example, 
we may set some nodes not in Yarn cluster.
{code:java}
excludeNodes = (invalid1, invalid2, invalid3)
clusterNodes = (valid1, valid2)
{code}

 - `numClusterNodes` may equals 0: When HA Yarn failover, it will take some 
time for all NodeManagers to register ResourceManager again. In this case, 
`numClusterNode` may equals 0 or some other number, and Spark driver failed.
 - too strong condition check: Spark driver will fail as long as 
"currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should 
not indicate a unrecovered fatal. For example, there are some NodeManagers 
restarting. So we can give some waiting time before job failed.

  was:
My streaming job will fail *due to executor failures all available nodes are 
blacklisted*.  This exception is thrown only when all node is blacklisted:
{code}
def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= 
numClusterNodes

val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ 
allocatorBlacklist.keySet
{code}

After  diving into the code, I found some critical conditions not be handle 
properly:
- unchecked `excludeNodes`: it comes from user config. If not set properly, it 
may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For example, 
we may set some nodes not in Yarn cluster.
{code}
excludeNodes = (invalid1, invalid2, invalid3)
clusterNodes = (valid1, valid2)
{code}
- `numClusterNodes` may equals 0: When HA Yarn failover, it will take some time 
for all NodeManagers to register ResourceManager again. In this case, 
`numClusterNode` may equals 0 or some other number, and Spark driver failed.
- too strong condition check: Spark driver will fail as long as 
"currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should 
not indicate a unrecovered fatal. For example, there are some NodeManagers 
restarting. So we can give some waiting time before job failed.


> Job failed due to executor failures all available nodes are blacklisted
> ---
>
> Key: SPARK-29683
> URL: https://issues.apache.org/jira/browse/SPARK-29683
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Genmao Yu
>Priority: Major
>
> My streaming job will fail *due to executor failures all available nodes are 
> blacklisted*. This exception is thrown only when all node is blacklisted:
> {code:java}
> def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= 
> numClusterNodes
> val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ 
> allocatorBlacklist.keySet
> {code}
> After diving into the code, I found some critical conditions not be handled 
> properly:
>  - unchecked `excludeNodes`: it comes from user config. If not set properly, 
> it may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For 
> example, we may set some nodes not in Yarn cluster.
> {code:java}
> excludeNodes = (invalid1, invalid2, invalid3)
> clusterNodes = (valid1, valid2)
> {code}
>  - `numClusterNodes` may equals 0: When HA Yarn failover, it will take some 
> time for all NodeManagers to register ResourceManager again. In this case, 
> `numClusterNode` may equals 0 or some other number, and Spark driver failed.
>  - too strong condition check: Spark driver will fail as long as 
> "currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should 
> not indicate a unrecovered fatal. For example, there are some NodeManagers 
> restarting. So we can give some waiting time before job failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29683) Job failed due to executor failures all available nodes are blacklisted

2019-10-31 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-29683:
-

 Summary: Job failed due to executor failures all available nodes 
are blacklisted
 Key: SPARK-29683
 URL: https://issues.apache.org/jira/browse/SPARK-29683
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 3.0.0
Reporter: Genmao Yu


My streaming job will fail *due to executor failures all available nodes are 
blacklisted*.  This exception is thrown only when all node is blacklisted:
{code}
def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= 
numClusterNodes

val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ 
allocatorBlacklist.keySet
{code}

After  diving into the code, I found some critical conditions not be handle 
properly:
- unchecked `excludeNodes`: it comes from user config. If not set properly, it 
may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For example, 
we may set some nodes not in Yarn cluster.
{code}
excludeNodes = (invalid1, invalid2, invalid3)
clusterNodes = (valid1, valid2)
{code}
- `numClusterNodes` may equals 0: When HA Yarn failover, it will take some time 
for all NodeManagers to register ResourceManager again. In this case, 
`numClusterNode` may equals 0 or some other number, and Spark driver failed.
- too strong condition check: Spark driver will fail as long as 
"currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should 
not indicate a unrecovered fatal. For example, there are some NodeManagers 
restarting. So we can give some waiting time before job failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29543) Support Structured Streaming UI

2019-10-21 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-29543:
-

 Summary: Support Structured Streaming UI
 Key: SPARK-29543
 URL: https://issues.apache.org/jira/browse/SPARK-29543
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming, Web UI
Affects Versions: 3.0.0
Reporter: Genmao Yu


Open this jira to support structured streaming UI



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29438) Failed to get state store in stream-stream join

2019-10-11 Thread Genmao Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-29438:
--
Description: 
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId   \ 
StateStoreVersion  --> StoreProviderId -> StateStore
StateStoreName/  
{code}
In spark stages, the task partition id is determined by the number of tasks. As 
we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened. Following is a sample pseudocode:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 (streamDf3)|   (streamDf1)(streamDf2)
 |  |   | |
  Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
 |  |   | | 
   SortSort | |
 \  /   \ /
  \/ \   /
SortMergeJoinStreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with error reading state 
store delta file.
{code:java}
LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 |  |   | |
BroadcastExchange   |  Exchange(200) Exchange(200)
 |  |   | | 
 |  |   | |
  \/ \   /
   \ /\ /
  BroadcastHashJoin StreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
In my job, I closed the auto BroadcastJoin feature (set 
spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
make the StateStore path determinate but not depends on TaskPartitionId.

  was:
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId   \ 
StateStoreVersion  --> StoreProviderId -> StateStore
StateStoreName/  
{code}
In spark stages, the task partition id is determined by the number of tasks. As 
we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 (streamDf3)|   (streamDf1)(streamDf2)
 |  |   | |
  Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
 |  |   | | 
   SortSort | |
 \  /   \ /
  \/ \   /
SortMergeJoinStreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with er

[jira] [Commented] (SPARK-29438) Failed to get state store in stream-stream join

2019-10-11 Thread Genmao Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949461#comment-16949461
 ] 

Genmao Yu commented on SPARK-29438:
---

[~kabhwan] Yes, the whole statestore path is 
`CheckpointRoot/operatorId/taskPartitionId/version/`. The `taskPartitionId` may 
change in some corner case in subsequent batch, that is what I mean.

> Failed to get state store in stream-stream join
> ---
>
> Key: SPARK-29438
> URL: https://issues.apache.org/jira/browse/SPARK-29438
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Genmao Yu
>Priority: Critical
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId   \ 
> StateStoreVersion  --> StoreProviderId -> StateStore
> StateStoreName/  
> {code}
> In spark stages, the task partition id is determined by the number of tasks. 
> As we said the StateStore file path depends on the task partition id. So if 
> stream-stream join task partition id is changed against last batch, it will 
> get wrong StateStore data or fail with non-exist StateStore data. In some 
> corner cases, it happened:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  (streamDf3)|   (streamDf1)(streamDf2)
>  |  |   | |
>   Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
>  |  |   | | 
>SortSort | |
>  \  /   \ /
>   \/ \   /
> SortMergeJoinStreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same 
> stage with `SortMergeJoin`. But when there is no new incoming data in 
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then 
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get 
> wrong StateStore path through TaskPartitionId, and failed with error reading 
> state store delta file.
> {code:java}
> LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  |  |   | |
> BroadcastExchange   |  Exchange(200) Exchange(200)
>  |  |   | | 
>  |  |   | |
>   \/ \   /
>\ /\ /
>   BroadcastHashJoin StreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set 
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
> make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-29438) Failed to get state store in stream-stream join

2019-10-11 Thread Genmao Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949365#comment-16949365
 ] 

Genmao Yu edited comment on SPARK-29438 at 10/11/19 11:15 AM:
--

There are several optional alternatives to resolve this issue:
 * Adding some rules to make the stream-stream join task partition id more 
determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. 
making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. 
 * As said in desc, the StateStore path should not depend on the 
TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But 
this may increase some RPC load in Driver.
 * Dynamically disable the `autoBroadcastJoin` in some rules.


was (Author: unclegen):
There are several optional alternatives to resolve this issue:
 * Adding some rules to make the stream-stream join task partition id more 
determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. 
making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. 
 * As said in desc, the StateStore path should not depend on the 
TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But 
this may increase the RPC load in Driver.
 * Dynamically disable the `autoBroadcastJoin` in some rules.

> Failed to get state store in stream-stream join
> ---
>
> Key: SPARK-29438
> URL: https://issues.apache.org/jira/browse/SPARK-29438
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Genmao Yu
>Priority: Critical
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId   \ 
> StateStoreVersion  --> StoreProviderId -> StateStore
> StateStoreName/  
> {code}
> In spark stages, the task partition id is determined by the number of tasks. 
> As we said the StateStore file path depends on the task partition id. So if 
> stream-stream join task partition id is changed against last batch, it will 
> get wrong StateStore data or fail with non-exist StateStore data. In some 
> corner cases, it happened:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  (streamDf3)|   (streamDf1)(streamDf2)
>  |  |   | |
>   Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
>  |  |   | | 
>SortSort | |
>  \  /   \ /
>   \/ \   /
> SortMergeJoinStreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same 
> stage with `SortMergeJoin`. But when there is no new incoming data in 
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then 
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get 
> wrong StateStore path through TaskPartitionId, and failed with error reading 
> state store delta file.
> {code:java}
> LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  |  |   | |
> BroadcastExchange   |  Exchange(200) Exchange(200)
>  |  |   | | 
>  |  |   | |
>   \/ \   /
>\ /\ /
>   BroadcastHashJoin StreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set 
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
> make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional comm

[jira] [Comment Edited] (SPARK-29438) Failed to get state store in stream-stream join

2019-10-11 Thread Genmao Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949365#comment-16949365
 ] 

Genmao Yu edited comment on SPARK-29438 at 10/11/19 11:15 AM:
--

There are several optional alternatives to resolve this issue:
 * Adding some rules to make the stream-stream join task partition id more 
determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. 
making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. 
 * As said in desc, the StateStore path should not depend on the 
TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But 
this may increase the RPC load in Driver.
 * Dynamically disable the `autoBroadcastJoin` in some rules.


was (Author: unclegen):
There are several optional alternatives to resolve this issue:
 * Adding some rules to make the stream-stream join task partition id more 
determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. 
making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. 
 *   As said in desc, the StateStore path should not depend on the 
TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But 
this may increase the RPC load in Driver.
 * Dynamically disable the `autoBroadcastJoin` in some rules.

> Failed to get state store in stream-stream join
> ---
>
> Key: SPARK-29438
> URL: https://issues.apache.org/jira/browse/SPARK-29438
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Genmao Yu
>Priority: Critical
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId   \ 
> StateStoreVersion  --> StoreProviderId -> StateStore
> StateStoreName/  
> {code}
> In spark stages, the task partition id is determined by the number of tasks. 
> As we said the StateStore file path depends on the task partition id. So if 
> stream-stream join task partition id is changed against last batch, it will 
> get wrong StateStore data or fail with non-exist StateStore data. In some 
> corner cases, it happened:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  (streamDf3)|   (streamDf1)(streamDf2)
>  |  |   | |
>   Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
>  |  |   | | 
>SortSort | |
>  \  /   \ /
>   \/ \   /
> SortMergeJoinStreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same 
> stage with `SortMergeJoin`. But when there is no new incoming data in 
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then 
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get 
> wrong StateStore path through TaskPartitionId, and failed with error reading 
> state store delta file.
> {code:java}
> LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  |  |   | |
> BroadcastExchange   |  Exchange(200) Exchange(200)
>  |  |   | | 
>  |  |   | |
>   \/ \   /
>\ /\ /
>   BroadcastHashJoin StreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set 
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
> make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional com

[jira] [Commented] (SPARK-29438) Failed to get state store in stream-stream join

2019-10-11 Thread Genmao Yu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949365#comment-16949365
 ] 

Genmao Yu commented on SPARK-29438:
---

There are several optional alternatives to resolve this issue:
 * Adding some rules to make the stream-stream join task partition id more 
determinate. In above cases, we can reorder the LogicalPlan in Union, i.e. 
making`StreamingSymmetricHashJoin` prior to `SortMergeJoin/BroadcastHashJoin`. 
 *   As said in desc, the StateStore path should not depend on the 
TaskPartitionId. We may get the StateStore path from StateStoreCoordinator. But 
this may increase the RPC load in Driver.
 * Dynamically disable the `autoBroadcastJoin` in some rules.

> Failed to get state store in stream-stream join
> ---
>
> Key: SPARK-29438
> URL: https://issues.apache.org/jira/browse/SPARK-29438
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Genmao Yu
>Priority: Critical
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId   \ 
> StateStoreVersion  --> StoreProviderId -> StateStore
> StateStoreName/  
> {code}
> In spark stages, the task partition id is determined by the number of tasks. 
> As we said the StateStore file path depends on the task partition id. So if 
> stream-stream join task partition id is changed against last batch, it will 
> get wrong StateStore data or fail with non-exist StateStore data. In some 
> corner cases, it happened:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  (streamDf3)|   (streamDf1)(streamDf2)
>  |  |   | |
>   Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
>  |  |   | | 
>SortSort | |
>  \  /   \ /
>   \/ \   /
> SortMergeJoinStreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same 
> stage with `SortMergeJoin`. But when there is no new incoming data in 
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then 
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get 
> wrong StateStore path through TaskPartitionId, and failed with error reading 
> state store delta file.
> {code:java}
> LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  |  |   | |
> BroadcastExchange   |  Exchange(200) Exchange(200)
>  |  |   | | 
>  |  |   | |
>   \/ \   /
>\ /\ /
>   BroadcastHashJoin StreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set 
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
> make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29438) Failed to get state store in stream-stream join

2019-10-11 Thread Genmao Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-29438:
--
Description: 
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId   \ 
StateStoreVersion  --> StoreProviderId -> StateStore
StateStoreName/  
{code}
In spark stages, the task partition id is determined by the number of tasks. As 
we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 (streamDf3)|   (streamDf1)(streamDf2)
 |  |   | |
  Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
 |  |   | | 
   SortSort | |
 \  /   \ /
  \/ \   /
SortMergeJoinStreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with error reading state 
store delta file.
{code:java}
LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 |  |   | |
BroadcastExchange   |  Exchange(200) Exchange(200)
 |  |   | | 
 |  |   | |
  \/ \   /
   \ /\ /
  BroadcastHashJoin StreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
In my job, I closed the auto BroadcastJoin feature (set 
spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
make the StateStore path determinate but not depends on TaskPartitionId.

  was:
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId   \ 
StateStoreVersion  --> StoreProviderId -> StateStore
StateStoreName/  
{code}
In spark stages, the task partition id is determined by the number of tasks. As 
we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 (streamDf3)|   (streamDf1)(streamDf2)
 |  |   | |
  Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
 |  |   | | 
   SortSort | |
 \  /   \ /
  \/ \   /
SortMergeJoinStreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with error reading state 
store delta fil

[jira] [Created] (SPARK-29438) Failed to get state store in stream-stream join

2019-10-11 Thread Genmao Yu (Jira)
Genmao Yu created SPARK-29438:
-

 Summary: Failed to get state store in stream-stream join
 Key: SPARK-29438
 URL: https://issues.apache.org/jira/browse/SPARK-29438
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.4
Reporter: Genmao Yu


Now, Spark use the `TaskPartitionId` to determine the StateStore path.
{code:java}
TaskPartitionId   \ 
StateStoreVersion  --> StoreProviderId -> StateStore
StateStoreName/  
{code}
In spark stages, the task partition id is determined by the number of tasks. As 
we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened:
{code:java}
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
{code}
A simplified DAG like this:
{code:java}
DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 (streamDf3)|   (streamDf1)(streamDf2)
 |  |   | |
  Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
 |  |   | | 
   SortSort | |
 \  /   \ /
  \/ \   /
SortMergeJoinStreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with error reading state 
store delta file.
{code:java}
LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
 |  |   | |
BroadcastExchange   |  Exchange(200) Exchange(200)
 |  |   | | 
 |  |   | |
  \/ \   /
   \ /\ /
  BroadcastHashJoin StreamingSymmetricHashJoin
 \ /
   \ /
 \ /
Union
{code}
In my job, I closed the auto BroadcastJoin feature (set 
spark.sql.autoBroadcastJoinThreshold=false) to walk around this bug. We should 
make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28256) Failed to initialize FileContextBasedCheckpointFileManager with uri without authority

2019-07-04 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-28256:
--
Description: 
reproduce code

{code:sql}

CREATE TABLE `user_click_count` (`userId` STRING, `click` BIGINT)
USING org.apache.spark.sql.json
OPTIONS (path 'hdfs:///tmp/test');
{code}

error:
{code:java}
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136)
at 
org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165)
at 
org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
at 
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297)
at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379)

...

Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:134)
... 67 more
Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without 
authority: hdfs:/tmp/test/_spark_metadata
at 
org.apache.hadoop.fs.AbstractFileSystem.getUri(AbstractFileSystem.java:313)
at 
org.apache.hadoop.fs.AbstractFileSystem.(AbstractFileSystem.java:266)
at org.apache.hadoop.fs.Hdfs.(Hdfs.java:80)
... 72 more
{code}


  was:
{code:java}
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136)
at 
org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165)
at 
org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
at 
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297)
at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379)

...

Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Na

[jira] [Updated] (SPARK-28256) Failed to initialize FileContextBasedCheckpointFileManager with uri without authority

2019-07-04 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-28256:
--
Description: 
{code:java}
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136)
at 
org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165)
at 
org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
at 
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297)
at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379)

...

Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:134)
... 67 more
Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without 
authority: hdfs:/tmp/test/_spark_metadata
at 
org.apache.hadoop.fs.AbstractFileSystem.getUri(AbstractFileSystem.java:313)
at 
org.apache.hadoop.fs.AbstractFileSystem.(AbstractFileSystem.java:266)
at org.apache.hadoop.fs.Hdfs.(Hdfs.java:80)
... 72 more
{code}

  was:
{code}
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136)
at 
org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165)
at 
org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
at 
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297)
at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379)

...

Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newIns

[jira] [Created] (SPARK-28256) Failed to initialize FileContextBasedCheckpointFileManager with uri without authority

2019-07-04 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-28256:
-

 Summary: Failed to initialize 
FileContextBasedCheckpointFileManager with uri without authority
 Key: SPARK-28256
 URL: https://issues.apache.org/jira/browse/SPARK-28256
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Genmao Yu


{code}
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:136)
at 
org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:165)
at 
org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:250)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
at 
org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:456)
at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.(CheckpointFileManager.scala:297)
at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:189)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at 
org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at 
org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:297)
at 
org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:379)

...

Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:134)
... 67 more
Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without 
authority: hdfs:/tmp/test13/_spark_metadata
at 
org.apache.hadoop.fs.AbstractFileSystem.getUri(AbstractFileSystem.java:313)
at 
org.apache.hadoop.fs.AbstractFileSystem.(AbstractFileSystem.java:266)
at org.apache.hadoop.fs.Hdfs.(Hdfs.java:80)
... 72 more
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28158) Hive UDFs supports UDT type

2019-06-24 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-28158:
-

 Summary: Hive UDFs supports UDT type
 Key: SPARK-28158
 URL: https://issues.apache.org/jira/browse/SPARK-28158
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.3, 3.0.0
Reporter: Genmao Yu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27717) support UNION in continuous processing

2019-05-14 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-27717:
-

 Summary: support UNION in continuous processing
 Key: SPARK-27717
 URL: https://issues.apache.org/jira/browse/SPARK-27717
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Genmao Yu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27715) SQL query details in UI dose not show in correct format.

2019-05-14 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-27715:
-

 Summary: SQL query details in UI dose not show in correct format.
 Key: SPARK-27715
 URL: https://issues.apache.org/jira/browse/SPARK-27715
 Project: Spark
  Issue Type: Bug
  Components: SQL, Web UI
Affects Versions: 3.0.0
Reporter: Genmao Yu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26302) retainedBatches configuration can eat up memory on driver

2019-05-14 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839253#comment-16839253
 ] 

Genmao Yu edited comment on SPARK-26302 at 5/14/19 9:27 AM:


Adding some warning in documentation is reasonable.


was (Author: unclegen):
Add some warning in documentation is reasonable.

> retainedBatches configuration can eat up memory on driver
> -
>
> Key: SPARK-26302
> URL: https://issues.apache.org/jira/browse/SPARK-26302
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, DStreams
>Affects Versions: 2.4.0
>Reporter: Behroz Sikander
>Priority: Minor
> Attachments: heap_dump_detail.png
>
>
> The documentation for configuration "spark.streaming.ui.retainedBatches" says
> "How many batches the Spark Streaming UI and status APIs remember before 
> garbage collecting"
> The default for this configuration is 1000.
> From our experience, the documentation is incomplete and we found it the hard 
> way.
> The size of a single BatchUIData is around 750KB. Increasing this value to 
> something like 5000 increases the total size to ~4GB.
> If your driver heap is not big enough, the job starts to slow down, has 
> frequent GCs and has long scheduling days. Once the heap is full, the job 
> cannot be recovered.
> A note of caution should be added to the documentation to let users know the 
> impact of this seemingly harmless configuration property.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26302) retainedBatches configuration can eat up memory on driver

2019-05-14 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839253#comment-16839253
 ] 

Genmao Yu commented on SPARK-26302:
---

Add some warning in documentation is reasonable.

> retainedBatches configuration can eat up memory on driver
> -
>
> Key: SPARK-26302
> URL: https://issues.apache.org/jira/browse/SPARK-26302
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, DStreams
>Affects Versions: 2.4.0
>Reporter: Behroz Sikander
>Priority: Minor
> Attachments: heap_dump_detail.png
>
>
> The documentation for configuration "spark.streaming.ui.retainedBatches" says
> "How many batches the Spark Streaming UI and status APIs remember before 
> garbage collecting"
> The default for this configuration is 1000.
> From our experience, the documentation is incomplete and we found it the hard 
> way.
> The size of a single BatchUIData is around 750KB. Increasing this value to 
> something like 5000 increases the total size to ~4GB.
> If your driver heap is not big enough, the job starts to slow down, has 
> frequent GCs and has long scheduling days. Once the heap is full, the job 
> cannot be recovered.
> A note of caution should be added to the documentation to let users know the 
> impact of this seemingly harmless configuration property.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26278) V2 Streaming sources cannot be written to V1 sinks

2019-05-14 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839246#comment-16839246
 ] 

Genmao Yu commented on SPARK-26278:
---

[~jpolchlo] Could you please close this jira? This issue has been fixed.

> V2 Streaming sources cannot be written to V1 sinks
> --
>
> Key: SPARK-26278
> URL: https://issues.apache.org/jira/browse/SPARK-26278
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, Structured Streaming
>Affects Versions: 2.3.2
>Reporter: Justin Polchlopek
>Priority: Major
>
> Starting from a streaming DataFrame derived from a custom v2 MicroBatch 
> reader, we have
> {code:java}
> val df: DataFrame = ... 
> assert(df.isStreaming)
> val outputFormat = "orc" // also applies to "csv" and "json" but not 
> "console" 
> df.writeStream
>   .format(outputFormat)
>   .option("checkpointLocation", "/tmp/checkpoints")
>   .option("path", "/tmp/result")
>   .start
> {code}
> This code fails with the following stack trace:
> {code:java}
> 2018-12-04 08:24:27 ERROR MicroBatchExecution:91 - Query [id = 
> 193f97bf-8064-4658-8aa6-0f481919eafe, runId = 
> e96ed7e5-aaf4-4ef4-a3f3-05fe0b01a715] terminated with error
> java.lang.ClassCastException: 
> org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to 
> org.apache.spark.sql.sources.v2.reader.streaming.Offset
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>     at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at 
> org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
>     at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>     at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>     at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>     at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code}
> I'm filing this issue on the suggestion of [~mojodna] who suggests that this 
> problem could be resolved by backporting streaming sinks fro

[jira] [Commented] (SPARK-27634) deleteCheckpointOnStop should be configurable

2019-05-05 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833281#comment-16833281
 ] 

Genmao Yu commented on SPARK-27634:
---

Do not add patch here. You can submit a PR to Spark. [How to contribute code to 
Spark|http://spark.apache.org/contributing.html]

> deleteCheckpointOnStop should be configurable
> -
>
> Key: SPARK-27634
> URL: https://issues.apache.org/jira/browse/SPARK-27634
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.2
>Reporter: Yu Wang
>Priority: Minor
> Fix For: 2.4.2
>
> Attachments: SPARK-27634.patch
>
>
> we need to delete checkpoint file after running the stream application 
> multiple times, so deleteCheckpointOnStop should be configurable



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27503) JobGenerator thread exit for some fatal errors but application keeps running

2019-04-18 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-27503:
-

 Summary: JobGenerator thread exit for some fatal errors but 
application keeps running
 Key: SPARK-27503
 URL: https://issues.apache.org/jira/browse/SPARK-27503
 Project: Spark
  Issue Type: Bug
  Components: DStreams
Affects Versions: 3.0.0
Reporter: Genmao Yu


JobGenerator thread (including some other EventLoop threads) may exit for some 
fatal error, like OOM, but Spark Streaming job keep running with no batch job 
generating. Currently, we only report any non-fatal error. 
{code}
override def run(): Unit = {
  try {
while (!stopped.get) {
  val event = eventQueue.take()
  try {
onReceive(event)
  } catch {
case NonFatal(e) =>
  try {
onError(e)
  } catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
  }
  }
}
  } catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
  }
}
{code}

In some corner cases, these event threads may exit with OOM error, but driver 
thread can still keep running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27413) Keep the same epoch pace between driver and executor.

2019-04-09 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-27413:
-

 Summary: Keep the same epoch pace between driver and executor.
 Key: SPARK-27413
 URL: https://issues.apache.org/jira/browse/SPARK-27413
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Genmao Yu


The pace of epoch generation in driver and epoch pulling in executor is 
different. It will result in many empty epochs for partition if the epoch 
pulling interval is larger than epoch generation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27355) make query execution more sensitive to epoch message late or lost

2019-04-03 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-27355:
-

 Summary: make query execution more sensitive to epoch message late 
or lost
 Key: SPARK-27355
 URL: https://issues.apache.org/jira/browse/SPARK-27355
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.0.0
Reporter: Genmao Yu


In SPARK-23503, we enforce sequencing of committed epochs for Continuous 
Execution. In case a message for epoch n is lost and epoch (n + 1) is ready for 
commit before epoch n is, epoch (n + 1) will wait for epoch n to be committed 
first. With extreme condition, we will wait for `epochBacklogQueueSize` (1 
in default) epochs and then failed. There is no need to wait for such a long 
time before query fail, and we can make the condition more sensitive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27218) spark-sql-kafka-0-10 startingOffset=earliest not working as expected with streaming

2019-03-21 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798635#comment-16798635
 ] 

Genmao Yu commented on SPARK-27218:
---

Could you please test it on master branch?

> spark-sql-kafka-0-10 startingOffset=earliest not working as expected with 
> streaming
> ---
>
> Key: SPARK-27218
> URL: https://issues.apache.org/jira/browse/SPARK-27218
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Windows 10, spark-2.4.0-bin-hadoop2.7
>Reporter: Emanuele Sabellico
>Priority: Minor
>
> Hi, I'm trying to stream a kafka topic with spark-sql-kafka-0-10_2.11:2.4.0 
> with a code like this:
> {noformat}
> spark.readStream
> .format("kafka")
> .option("subscribe", "test1")
> .option("startingOffsets", "earliest")
> .load(){noformat}
> I find that Spark doesn't start from the earliest offset but from the latest. 
> Or better, initially it gets the earliest offsets but then it does a seek to 
> end, skipping the messages in-between.
>  In the logs I find this:
> {noformat}
> 2019-03-20 11:59:50 INFO  KafkaMicroBatchReader:54 - Initial offsets: 
> {"test1":{"0":1740}}
> 2019-03-20 11:59:50 INFO  Fetcher:583 - [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-6c3a7acb-91fa-4b57-81f1-c8f7c6c5ab6d--880364893-driver-0]
>  Resetting offset for partition test1-0 to offset 15922.
> {noformat}
> Looking into the code I find that in _KafkaMicroBatchReader.setOffsetRange_ 
> the method _KafkaOffsetReader.fetchLatestOffsets_ is called and here there is 
> a _consumer.seekToEnd(partitions)_
> According to the documentation I was expecting that the streaming would have 
> started from the earliest offset in this case. Is there something that I'm 
> getting wrong or doing wrong?
> Thanks in advance!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-08-23 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591151#comment-16591151
 ] 

Genmao Yu commented on SPARK-24630:
---

[~Jackey Lee] I am glad to participate in code review. 

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-08-01 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565252#comment-16565252
 ] 

Genmao Yu commented on SPARK-24630:
---

[~Jackey Lee] Pretty good!  We also have the SQL Streaming demand. Is there any 
more detailed design doc? Or we can list those streaming sql syntax which is 
different from batch firstly.

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-30 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561762#comment-16561762
 ] 

Genmao Yu edited comment on SPARK-24630 at 7/30/18 11:27 AM:
-

Practice to add the StreamSQL DDL, like this:
{code:java}
spark-sql> create source stream service_logs with 
("type"="kafka","bootstrap.servers"="x.x.x.x:9092", "topic"="service_log"); 
Time taken: 1.833 seconds 
spark-sql> desc service_logs;
key binary NULL 
value binary NULL 
topic string NULL 
partition int NULL 
offset bigint NULL 
timestamp timestamp NULL 
timestampType int NULL 
Time taken: 0.394 seconds, Fetched 7 row(s) 
spark-sql> create sink stream analysis with ("type"="kafka", 
"outputMode"="complete", "bootstrap.servers"="x.x.x.x:9092", 
"topic"="analysis", "checkpointLocation"="hdfs:///tmp/cp0"); 
Time taken: 0.027 seconds 
spark-sql> insert into analysis select key, value, count(*) from service_logs 
group by key, value; 
Time taken: 0.355 seconds


{code}


was (Author: unclegen):
Try to add the StreamSQL DDL, like this:
{code:java}
spark-sql> create source stream service_logs with 
("type"="kafka","bootstrap.servers"="x.x.x.x:9092", "topic"="service_log"); 
Time taken: 1.833 seconds 
spark-sql> desc service_logs;
key binary NULL 
value binary NULL 
topic string NULL 
partition int NULL 
offset bigint NULL 
timestamp timestamp NULL 
timestampType int NULL 
Time taken: 0.394 seconds, Fetched 7 row(s) 
spark-sql> create sink stream analysis with ("type"="kafka", 
"outputMode"="complete", "bootstrap.servers"="x.x.x.x:9092", 
"topic"="analysis", "checkpointLocation"="hdfs:///tmp/cp0"); 
Time taken: 0.027 seconds 
spark-sql> insert into analysis select key, value, count(*) from service_logs 
group by key, value; 
Time taken: 0.355 seconds


{code}

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-30 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-24630:
--
Attachment: (was: image-2018-07-30-18-48-38-352.png)

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-30 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-24630:
--
Attachment: (was: image-2018-07-30-18-06-30-506.png)

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-30 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561762#comment-16561762
 ] 

Genmao Yu edited comment on SPARK-24630 at 7/30/18 10:53 AM:
-

Try to add the StreamSQL DDL, like this:
{code:java}
spark-sql> create source stream service_logs with 
("type"="kafka","bootstrap.servers"="x.x.x.x:9092", "topic"="service_log"); 
Time taken: 1.833 seconds 
spark-sql> desc service_logs;
key binary NULL 
value binary NULL 
topic string NULL 
partition int NULL 
offset bigint NULL 
timestamp timestamp NULL 
timestampType int NULL 
Time taken: 0.394 seconds, Fetched 7 row(s) 
spark-sql> create sink stream analysis with ("type"="kafka", 
"outputMode"="complete", "bootstrap.servers"="x.x.x.x:9092", 
"topic"="analysis", "checkpointLocation"="hdfs:///tmp/cp0"); 
Time taken: 0.027 seconds 
spark-sql> insert into analysis select key, value, count(*) from service_logs 
group by key, value; 
Time taken: 0.355 seconds


{code}


was (Author: unclegen):
Try to add the StreamSQL DDL, like this:

!image-2018-07-30-18-48-38-352.png!

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf, 
> image-2018-07-30-18-06-30-506.png, image-2018-07-30-18-48-38-352.png
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-30 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561762#comment-16561762
 ] 

Genmao Yu commented on SPARK-24630:
---

Try to add the StreamSQL DDL, like this:

!image-2018-07-30-18-48-38-352.png!

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf, 
> image-2018-07-30-18-06-30-506.png, image-2018-07-30-18-48-38-352.png
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-30 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-24630:
--
Attachment: image-2018-07-30-18-48-38-352.png

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf, 
> image-2018-07-30-18-06-30-506.png, image-2018-07-30-18-48-38-352.png
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-30 Thread Genmao Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-24630:
--
Attachment: image-2018-07-30-18-06-30-506.png

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf, image-2018-07-30-18-06-30-506.png
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-25 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556837#comment-16556837
 ] 

Genmao Yu edited comment on SPARK-24630 at 7/26/18 3:24 AM:


[~zsxwing] Is there plan to better support SQL on streaming?  like provide 
stream table ddl, window and watermark syntax on stream etc.


was (Author: unclegen):
[~zsxwing] Is there plan to better support SQL on streaming?  like provide 
stream table ddl, window ** and watermark  ** syntax on stream etc.

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-25 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556837#comment-16556837
 ] 

Genmao Yu commented on SPARK-24630:
---

[~zsxwing] Is there plan to better support SQL on streaming?  like provide 
stream table ddl, window ** and watermark  ** syntax on stream etc.

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-24 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554043#comment-16554043
 ] 

Genmao Yu edited comment on SPARK-24630 at 7/24/18 11:38 AM:
-

[~zsxwing]

{{Structured Streaming supports standard SQL as the batch queries, so the users 
can switch their queries between batch and streaming easily.}}

IIUC, there are some queries we can not switch from stream to batch, like 
"*groupBy window a**ggregation"* or "*over window a**ggregation"* on stream. 
Isn't it?


was (Author: unclegen):
{{Structured Streaming supports standard SQL as the batch queries, so the users 
can switch their queries between batch and streaming easily.}}

IIUC, there are some queries we can not switch from stream to batch, like 
"*groupBy window a**ggregation"* or "*over window a**ggregation"* on stream. 
Isn't it?

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-24 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554043#comment-16554043
 ] 

Genmao Yu commented on SPARK-24630:
---

{{Structured Streaming supports standard SQL as the batch queries, so the users 
can switch their queries between batch and streaming easily.}}

IIUC, there are some queries we can not switch from stream to batch, like 
"*groupBy window a**ggregation"*** or "*over window a**ggregation"*** on 
stream. Isn't it?

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2018-07-24 Thread Genmao Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554043#comment-16554043
 ] 

Genmao Yu edited comment on SPARK-24630 at 7/24/18 10:07 AM:
-

{{Structured Streaming supports standard SQL as the batch queries, so the users 
can switch their queries between batch and streaming easily.}}

IIUC, there are some queries we can not switch from stream to batch, like 
"*groupBy window a**ggregation"* or "*over window a**ggregation"* on stream. 
Isn't it?


was (Author: unclegen):
{{Structured Streaming supports standard SQL as the batch queries, so the users 
can switch their queries between batch and streaming easily.}}

IIUC, there are some queries we can not switch from stream to batch, like 
"*groupBy window a**ggregation"*** or "*over window a**ggregation"*** on 
stream. Isn't it?

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20672) Keep the `isStreaming` property in triggerLogicalPlan in Structured Streaming

2017-05-08 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-20672:
-

 Summary: Keep the `isStreaming` property in triggerLogicalPlan in 
Structured Streaming
 Key: SPARK-20672
 URL: https://issues.apache.org/jira/browse/SPARK-20672
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.1.1
Reporter: Genmao Yu


In Structured Streaming, the "isStreaming" property will be eliminated in each 
triggerLogicalPlan. Then, some rules will be applied to this triggerLogicalPlan 
mistakely. So, we should refactor existing code to better execute batch query 
and ss query.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20139) Spark UI reports partial success for completed stage while log shows all tasks are finished

2017-03-30 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950244#comment-15950244
 ] 

Genmao Yu commented on SPARK-20139:
---

The event queue exceeds its capacity, so new events will be dropped. 

> Spark UI reports partial success for completed stage while log shows all 
> tasks are finished
> ---
>
> Key: SPARK-20139
> URL: https://issues.apache.org/jira/browse/SPARK-20139
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.1.0
>Reporter: Etti Gur
> Attachments: screenshot-1.png
>
>
> Spark UI reports partial success for completed stage while log shows all 
> tasks are finished - i.e.:
> We have a stage that is presented under completed stages on spark UI,
> but the successful tasks are shown like so: (146372/524964) not as you'd 
> expect (524964/524964)
> Looking at the application master log shows all tasks in that stage are 
> successful:
> 17/03/29 09:45:49 INFO TaskSetManager: Finished task 522973.0 in stage 0.0 
> (TID 522973) in 1163910 ms on ip-10-1-15-34.ec2.internal (executor 116) 
> (524963/524964)
> 17/03/29 09:45:49 INFO TaskSetManager: Finished task 12508.0 in stage 2.0 
> (TID 537472) in 241250 ms on ip-10-1-15-14.ec2.internal (executor 38) 
> (20234/20262)
> 17/03/29 09:45:49 INFO TaskSetManager: Finished task 12465.0 in stage 2.0 
> (TID 537429) in 241994 ms on ip-10-1-15-106.ec2.internal (executor 133) 
> (20235/20262)
> 17/03/29 09:45:49 INFO TaskSetManager: Finished task 15079.0 in stage 2.0 
> (TID 540043) in 202889 ms on ip-10-1-15-173.ec2.internal (executor 295) 
> (20236/20262)
> 17/03/29 09:45:49 INFO TaskSetManager: Finished task 19828.0 in stage 2.0 
> (TID 544792) in 137845 ms on ip-10-1-15-147.ec2.internal (executor 43) 
> (20237/20262)
> 17/03/29 09:45:50 INFO TaskSetManager: Finished task 19072.0 in stage 2.0 
> (TID 544036) in 147363 ms on ip-10-1-15-19.ec2.internal (executor 175) 
> (20238/20262)
> 17/03/29 09:45:50 INFO TaskSetManager: Finished task 524146.0 in stage 0.0 
> (TID 524146) in 889950 ms on ip-10-1-15-72.ec2.internal (executor 74) 
> (524964/524964)
> Also in the log we get an error:
> 17/03/29 08:24:16 ERROR LiveListenerBus: Dropping SparkListenerEvent because 
> no remaining room in event queue. This likely means one of the SparkListeners 
> is too slow and cannot keep up with the rate at which tasks are being started 
> by the scheduler.
> This looks like the stage is indeed completed with all its tasks but UI shows 
> like not all tasks really finished.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20065) Empty output files created for aggregation query in append mode

2017-03-23 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15937904#comment-15937904
 ] 

Genmao Yu commented on SPARK-20065:
---

Make sense, I will give a fast update.

> Empty output files created for aggregation query in append mode
> ---
>
> Key: SPARK-20065
> URL: https://issues.apache.org/jira/browse/SPARK-20065
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Silvio Fiorito
>
> I've got a Kafka topic which I'm querying, running a windowed aggregation, 
> with a 30 second watermark, 10 second trigger, writing out to Parquet with 
> append output mode.
> Every 10 second trigger generates a file, regardless of whether there was any 
> data for that trigger, or whether any records were actually finalized by the 
> watermark.
> Is this expected behavior or should it not write out these empty files?
> {code}
> val df = spark.readStream.format("kafka")
> val query = df
>   .withWatermark("timestamp", "30 seconds")
>   .groupBy(window($"timestamp", "10 seconds"))
>   .count()
>   .select(date_format($"window.start", "HH:mm:ss").as("time"), $"count")
> query
>   .writeStream
>   .format("parquet")
>   .option("checkpointLocation", aggChk)
>   .trigger(ProcessingTime("10 seconds"))
>   .outputMode("append")
>   .start(aggPath)
> {code}
> As the query executes, do a file listing on "aggPath" and you'll see 339 byte 
> files at a minimum until we arrive at the first watermark and the initial 
> batch is finalized. Even after that though, as there are empty batches it'll 
> keep generating empty files every trigger.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20061) Reading a file with colon (:) from S3 fails with URISyntaxException

2017-03-22 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15937675#comment-15937675
 ] 

Genmao Yu commented on SPARK-20061:
---

Colon is not supported in hadoop, see 
[HDFS-13|https://issues.apache.org/jira/browse/HDFS-13]

> Reading a file with colon (:) from S3 fails with URISyntaxException
> ---
>
> Key: SPARK-20061
> URL: https://issues.apache.org/jira/browse/SPARK-20061
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: EC2, AWS
>Reporter: Michel Lemay
>
> When reading a bunch of files from s3 using wildcards, it fails with the 
> following exception:
> {code}
> scala> val fn = "s3a://mybucket/path/*/"
> scala> val ds = spark.readStream.schema(schema).json(fn)
> java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative 
> path in absolute URI: 
> 2017-01-06T20:33:45.255-analyticsqa-49569270507599054034141623773442922465540524816321216514.json
>   at org.apache.hadoop.fs.Path.initialize(Path.java:205)
>   at org.apache.hadoop.fs.Path.(Path.java:171)
>   at org.apache.hadoop.fs.Path.(Path.java:93)
>   at org.apache.hadoop.fs.Globber.glob(Globber.java:241)
>   at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1657)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.globPath(SparkHadoopUtil.scala:237)
>   at 
> org.apache.spark.deploy.SparkHadoopUtil.globPathIfNecessary(SparkHadoopUtil.scala:243)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$2.apply(DataSource.scala:131)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$2.apply(DataSource.scala:127)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>   at scala.collection.immutable.List.flatMap(List.scala:344)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:127)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:124)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:138)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:229)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
>   at 
> org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
>   at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
>   at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:133)
>   at 
> org.apache.spark.sql.streaming.DataStreamReader.json(DataStreamReader.scala:181)
>   ... 50 elided
> Caused by: java.net.URISyntaxException: Relative path in absolute URI: 
> 2017-01-06T20:33:45.255-analyticsqa-49569270507599054034141623773442922465540524816321216514.json
>   at java.net.URI.checkPath(URI.java:1823)
>   at java.net.URI.(URI.java:745)
>   at org.apache.hadoop.fs.Path.initialize(Path.java:202)
>   ... 73 more
> {code}
> The file in question sits at the root of s3a://mybucket/path/
> {code}
> aws s3 ls s3://mybucket/path/
>PRE subfolder1/
>PRE subfolder2/
> ...
> 2017-01-06 20:33:46   1383 
> 2017-01-06T20:33:45.255-analyticsqa-49569270507599054034141623773442922465540524816321216514.json
> ...
> {code}
> Removing the wildcard from path make it work but it obviously does misses all 
> files in subdirectories.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20021) Miss backslash in python code

2017-03-19 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-20021:
-

 Summary: Miss backslash in python code
 Key: SPARK-20021
 URL: https://issues.apache.org/jira/browse/SPARK-20021
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19926) Make pyspark exception more readable

2017-03-12 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-19926:
--
Description: 
Exception in pyspark is a little difficult to read.

like:

{code}
Traceback (most recent call last):
  File "", line 5, in 
  File "/root/dev/spark/dist/python/pyspark/sql/streaming.py", line 853, in 
start
return self._sq(self._jwrite.start())
  File 
"/root/dev/spark/dist/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", 
line 1133, in __call__
  File "/root/dev/spark/dist/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Append output mode not supported when 
there are streaming aggregations on streaming DataFrames/DataSets without 
watermark;;\nAggregate [window#17, word#5], [window#17 AS window#11, word#5, 
count(1) AS count#16L]\n+- Filter ((t#6 >= window#17.start) && (t#6 < 
window#17.end))\n   +- Expand [ArrayBuffer(named_struct(start, 
CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, 
(CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0) + 
3000)), word#5, t#6-T3ms), ArrayBuffer(named_struct(start, 
CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, 
(CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0) + 
3000)), word#5, t#6-T3ms)], [window#17, word#5, t#6-T3ms]\n  +- 
EventTimeWatermark t#6: timestamp, interval 30 seconds\n +- Project 
[cast(word#0 as string) AS word#5, cast(t#1 as timestamp) AS t#6]\n
+- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@c4079ca,csv,List(),Some(StructType(StructField(word,StringType,true),
 StructField(t,IntegerType,true))),List(),None,Map(sep -> ;, path -> 
/tmp/data),None), FileSource[/tmp/data], [word#0, t#1]\n'
{code}


> Make pyspark exception more readable
> 
>
> Key: SPARK-19926
> URL: https://issues.apache.org/jira/browse/SPARK-19926
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>Priority: Minor
>
> Exception in pyspark is a little difficult to read.
> like:
> {code}
> Traceback (most recent call last):
>   File "", line 5, in 
>   File "/root/dev/spark/dist/python/pyspark/sql/streaming.py", line 853, in 
> start
> return self._sq(self._jwrite.start())
>   File 
> "/root/dev/spark/dist/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", 
> line 1133, in __call__
>   File "/root/dev/spark/dist/python/pyspark/sql/utils.py", line 69, in deco
> raise AnalysisException(s.split(': ', 1)[1], stackTrace)
> pyspark.sql.utils.AnalysisException: u'Append output mode not supported when 
> there are streaming aggregations on streaming DataFrames/DataSets without 
> watermark;;\nAggregate [window#17, word#5], [window#17 AS window#11, word#5, 
> count(1) AS count#16L]\n+- Filter ((t#6 >= window#17.start) && (t#6 < 
> window#17.end))\n   +- Expand [ArrayBuffer(named_struct(start, 
> CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
> double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, 
> (CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
> double))) + cast(0 as bigint)) - cast(1 as bigint)) * 3000) + 0) + 
> 3000)), word#5, t#6-T3ms), ArrayBuffer(named_struct(start, 
> CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
> double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0), end, 
> (CEIL((cast((precisetimestamp(t#6) - 0) as double) / cast(3000 as 
> double))) + cast(1 as bigint)) - cast(1 as bigint)) * 3000) + 0) + 
> 3000)), word#5, t#6-T3ms)], [window#17, word#5, t#6-T3ms]\n  
> +- EventTimeWatermark t#6: timestamp, interval 30 seconds\n +- 
> Project [cast(word#0 as string) AS word#5, cast(t#1 as timestamp) AS t#6]\n   
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@c4079ca,csv,List(),Some(StructType(StructField(word,StringType,true),
>  StructField(t,IntegerType,true))),List(),None,Map(sep -> ;, path -> 
> /tmp/data),None), FileSource[/tmp/data], [word#0, t#1]\n'
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19926) Make pyspark exception more readable

2017-03-12 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19926:
-

 Summary: Make pyspark exception more readable
 Key: SPARK-19926
 URL: https://issues.apache.org/jira/browse/SPARK-19926
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19853) Uppercase Kafka topics fail when startingOffsets are SpecificOffsets

2017-03-08 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901158#comment-15901158
 ] 

Genmao Yu commented on SPARK-19853:
---

Good catch! I will open a pr to fix this. Could you please help to review?

> Uppercase Kafka topics fail when startingOffsets are SpecificOffsets
> 
>
> Key: SPARK-19853
> URL: https://issues.apache.org/jira/browse/SPARK-19853
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Chris Bowden
>Priority: Trivial
>
> When using the KafkaSource with Structured Streaming, consumer assignments 
> are not what the user expects if startingOffsets is set to an explicit set of 
> topics/partitions in JSON where the topic(s) happen to have uppercase 
> characters. When StartingOffsets is constructed, the original string value 
> from options is transformed toLowerCase to make matching on "earliest" and 
> "latest" case insensitive. However, the toLowerCase json is passed to 
> SpecificOffsets for the terminal condition, so topic names may not be what 
> the user intended by the time assignments are made with the underlying 
> KafkaConsumer.
> From KafkaSourceProvider:
> {code}
> val startingOffsets = 
> caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase)
>  match {
> case Some("latest") => LatestOffsets
> case Some("earliest") => EarliestOffsets
> case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
> case None => LatestOffsets
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19861) watermark should not be a negative time.

2017-03-07 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19861:
-

 Summary: watermark should not be a negative time.
 Key: SPARK-19861
 URL: https://issues.apache.org/jira/browse/SPARK-19861
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19822) CheckpointSuite.testCheckpointedOperation: should not check checkpointFilesOfLatestTime by the PATH string.

2017-03-04 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19822:
-

 Summary: CheckpointSuite.testCheckpointedOperation: should not 
check checkpointFilesOfLatestTime by the PATH string.
 Key: SPARK-19822
 URL: https://issues.apache.org/jira/browse/SPARK-19822
 Project: Spark
  Issue Type: Bug
  Components: Tests
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19807) Add reason for cancellation when a stage is killed using web UI

2017-03-03 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894185#comment-15894185
 ] 

Genmao Yu edited comment on SPARK-19807 at 3/3/17 11:35 AM:


!https://cloud.githubusercontent.com/assets/7402327/23549702/6a0c93f6-0048-11e7-8a3f-bf58befb887b.png!

Do you mean the "Job 0 cancelled" in picture?


was (Author: unclegen):
!https://cloud.githubusercontent.com/assets/7402327/23549478/70888646-0047-11e7-8e2c-e64a3db43711.png!

Do you mean the "Job 0 cancelled" in picture?

> Add reason for cancellation when a stage is killed using web UI
> ---
>
> Key: SPARK-19807
> URL: https://issues.apache.org/jira/browse/SPARK-19807
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.1.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>
> When a user kills a stage using web UI (in Stages page), 
> {{StagesTab.handleKillRequest}} requests {{SparkContext}} to cancel the stage 
> without giving a reason. {{SparkContext}} has {{cancelStage(stageId: Int, 
> reason: String)}} that Spark could use to pass the information for 
> monitoring/debugging purposes.
> {code}
> scala> sc.range(0, 5, 1, 1).mapPartitions { nums => { Thread.sleep(60 * 
> 1000); nums } }.count
> {code}
> Use http://localhost:4040/stages/ and click Kill.
> {code}
> org.apache.spark.SparkException: Job 0 cancelled because Stage 0 was cancelled
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1486)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1426)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1415)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1408)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1656)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1645)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2019)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2040)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2059)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
>   ... 48 elided
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19807) Add reason for cancellation when a stage is killed using web UI

2017-03-03 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894185#comment-15894185
 ] 

Genmao Yu commented on SPARK-19807:
---

!https://cloud.githubusercontent.com/assets/7402327/23549478/70888646-0047-11e7-8e2c-e64a3db43711.png!

Do you mean the "Job 0 cancelled" in picture?

> Add reason for cancellation when a stage is killed using web UI
> ---
>
> Key: SPARK-19807
> URL: https://issues.apache.org/jira/browse/SPARK-19807
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.1.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>
> When a user kills a stage using web UI (in Stages page), 
> {{StagesTab.handleKillRequest}} requests {{SparkContext}} to cancel the stage 
> without giving a reason. {{SparkContext}} has {{cancelStage(stageId: Int, 
> reason: String)}} that Spark could use to pass the information for 
> monitoring/debugging purposes.
> {code}
> scala> sc.range(0, 5, 1, 1).mapPartitions { nums => { Thread.sleep(60 * 
> 1000); nums } }.count
> {code}
> Use http://localhost:4040/stages/ and click Kill.
> {code}
> org.apache.spark.SparkException: Job 0 cancelled because Stage 0 was cancelled
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1486)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1426)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1415)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1408)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1656)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1645)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2019)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2040)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2059)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
>   ... 48 elided
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19805) Log the row type when query result dose not match

2017-03-02 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-19805:
--
Summary: Log the row type when query result dose not match  (was: Log the 
row type when query result dose match)

> Log the row type when query result dose not match
> -
>
> Key: SPARK-19805
> URL: https://issues.apache.org/jira/browse/SPARK-19805
> Project: Spark
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19805) Log the row type when query result dose match

2017-03-02 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19805:
-

 Summary: Log the row type when query result dose match
 Key: SPARK-19805
 URL: https://issues.apache.org/jira/browse/SPARK-19805
 Project: Spark
  Issue Type: Improvement
  Components: Tests
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-19349) Check resource ready to avoid multiple receivers to be scheduled on the same node.

2017-03-02 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu closed SPARK-19349.
-
Resolution: Won't Fix

> Check resource ready to avoid multiple receivers to be scheduled on the same 
> node.
> --
>
> Key: SPARK-19349
> URL: https://issues.apache.org/jira/browse/SPARK-19349
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>
> Currently,  we can only ensure registered resource satisfy the 
> "spark.scheduler.minRegisteredResourcesRatio". But if 
> "spark.scheduler.minRegisteredResourcesRatio" is set too small, receivers may 
> still be scheduled to few nodes. In fact, we can give once more chance to 
> wait for sufficient resource to schedule receiver evenly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19800) Implement one kind of streaming sampling - reservoir sampling

2017-03-02 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19800:
-

 Summary: Implement one kind of streaming sampling - reservoir 
sampling
 Key: SPARK-19800
 URL: https://issues.apache.org/jira/browse/SPARK-19800
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19738) Consider adding error handler to DataStreamWriter

2017-02-27 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887218#comment-15887218
 ] 

Genmao Yu commented on SPARK-19738:
---

[~jlalwani] I tested it on latest master branch, and return NULL if meet bad 
data.

> Consider adding error handler to DataStreamWriter
> -
>
> Key: SPARK-19738
> URL: https://issues.apache.org/jira/browse/SPARK-19738
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Jayesh lalwani
>
> For Structured streaming implementations, it is important that the 
> applications stay always On. However, right now, errors stop the driver. In 
> some cases, this is not desirable behavior. For example, I have the following 
> application
> {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").start()
> {code}
> I send the following input to it 
> {quote}
> 1,Iron man
> 2,SUperman
> {quote}
> Obviously, the data is bad. This causes the executor to throw an exception 
> that propogates to the driver, which promptly shuts down. The driver is 
> running in supervised mode, and it gets restarted. The application reads the 
> same bad input and shuts down again. This goes ad-infinitum. This behavior is 
> desirable, in cases, the error is recoverable. For example, if the executor 
> cannot talk to the database, we want the application to keep trying the same 
> input again and again till the database recovers. However, for some cases, 
> this behavior is undesirable. We do not want this to happen when the input is 
> bad. We want to put the bad record in some sort of dead letter queue. Or 
> maybe we want to kill the driver only when the number of errors have crossed 
> a certain threshold. Or maybe we want to email someone.
> Proposal:
> Add a error handler to the data stream. When the executor fails, it should 
> call the error handler and pass the Exception to the error handler. The error 
> handler could eat the exception, or transform it, or update counts in an 
> accumulator, etc
>  {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19738) Consider adding error handler to DataStreamWriter

2017-02-27 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885374#comment-15885374
 ] 

Genmao Yu edited comment on SPARK-19738 at 2/27/17 9:44 AM:


[~gaaldornick] Sorry I can not reproduce it. What spark version are you testint 
with? 


was (Author: unclegen):
[~gaaldornick] Sorry I can not reproduce it.

> Consider adding error handler to DataStreamWriter
> -
>
> Key: SPARK-19738
> URL: https://issues.apache.org/jira/browse/SPARK-19738
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Jayesh lalwani
>
> For Structured streaming implementations, it is important that the 
> applications stay always On. However, right now, errors stop the driver. In 
> some cases, this is not desirable behavior. For example, I have the following 
> application
> {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").start()
> {code}
> I send the following input to it 
> {quote}
> 1,Iron man
> 2,SUperman
> {quote}
> Obviously, the data is bad. This causes the executor to throw an exception 
> that propogates to the driver, which promptly shuts down. The driver is 
> running in supervised mode, and it gets restarted. The application reads the 
> same bad input and shuts down again. This goes ad-infinitum. This behavior is 
> desirable, in cases, the error is recoverable. For example, if the executor 
> cannot talk to the database, we want the application to keep trying the same 
> input again and again till the database recovers. However, for some cases, 
> this behavior is undesirable. We do not want this to happen when the input is 
> bad. We want to put the bad record in some sort of dead letter queue. Or 
> maybe we want to kill the driver only when the number of errors have crossed 
> a certain threshold. Or maybe we want to email someone.
> Proposal:
> Add a error handler to the data stream. When the executor fails, it should 
> call the error handler and pass the Exception to the error handler. The error 
> handler could eat the exception, or transform it, or update counts in an 
> accumulator, etc
>  {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19738) Consider adding error handler to DataStreamWriter

2017-02-27 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885374#comment-15885374
 ] 

Genmao Yu edited comment on SPARK-19738 at 2/27/17 9:45 AM:


[~gaaldornick] Sorry I can not reproduce it. What spark version are you testing 
with? 


was (Author: unclegen):
[~gaaldornick] Sorry I can not reproduce it. What spark version are you testint 
with? 

> Consider adding error handler to DataStreamWriter
> -
>
> Key: SPARK-19738
> URL: https://issues.apache.org/jira/browse/SPARK-19738
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Jayesh lalwani
>
> For Structured streaming implementations, it is important that the 
> applications stay always On. However, right now, errors stop the driver. In 
> some cases, this is not desirable behavior. For example, I have the following 
> application
> {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").start()
> {code}
> I send the following input to it 
> {quote}
> 1,Iron man
> 2,SUperman
> {quote}
> Obviously, the data is bad. This causes the executor to throw an exception 
> that propogates to the driver, which promptly shuts down. The driver is 
> running in supervised mode, and it gets restarted. The application reads the 
> same bad input and shuts down again. This goes ad-infinitum. This behavior is 
> desirable, in cases, the error is recoverable. For example, if the executor 
> cannot talk to the database, we want the application to keep trying the same 
> input again and again till the database recovers. However, for some cases, 
> this behavior is undesirable. We do not want this to happen when the input is 
> bad. We want to put the bad record in some sort of dead letter queue. Or 
> maybe we want to kill the driver only when the number of errors have crossed 
> a certain threshold. Or maybe we want to email someone.
> Proposal:
> Add a error handler to the data stream. When the executor fails, it should 
> call the error handler and pass the Exception to the error handler. The error 
> handler could eat the exception, or transform it, or update counts in an 
> accumulator, etc
>  {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19749) Name socket source with a meaningful name

2017-02-27 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19749:
-

 Summary: Name socket source with a meaningful name
 Key: SPARK-19749
 URL: https://issues.apache.org/jira/browse/SPARK-19749
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19738) Consider adding error handler to DataStreamWriter

2017-02-27 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885374#comment-15885374
 ] 

Genmao Yu commented on SPARK-19738:
---

[~gaaldornick] Sorry I can not reproduce it.

> Consider adding error handler to DataStreamWriter
> -
>
> Key: SPARK-19738
> URL: https://issues.apache.org/jira/browse/SPARK-19738
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Jayesh lalwani
>
> For Structured streaming implementations, it is important that the 
> applications stay always On. However, right now, errors stop the driver. In 
> some cases, this is not desirable behavior. For example, I have the following 
> application
> {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").start()
> {code}
> I send the following input to it 
> {quote}
> 1,Iron man
> 2,SUperman
> {quote}
> Obviously, the data is bad. This causes the executor to throw an exception 
> that propogates to the driver, which promptly shuts down. The driver is 
> running in supervised mode, and it gets restarted. The application reads the 
> same bad input and shuts down again. This goes ad-infinitum. This behavior is 
> desirable, in cases, the error is recoverable. For example, if the executor 
> cannot talk to the database, we want the application to keep trying the same 
> input again and again till the database recovers. However, for some cases, 
> this behavior is undesirable. We do not want this to happen when the input is 
> bad. We want to put the bad record in some sort of dead letter queue. Or 
> maybe we want to kill the driver only when the number of errors have crossed 
> a certain threshold. Or maybe we want to email someone.
> Proposal:
> Add a error handler to the data stream. When the executor fails, it should 
> call the error handler and pass the Exception to the error handler. The error 
> handler could eat the exception, or transform it, or update counts in an 
> accumulator, etc
>  {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = 
> spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-19699) createOrReplaceTable does not always replace an existing table of the same name

2017-02-24 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-19699:
--
Comment: was deleted

(was: Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan] 
What's your opinion?

!https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png!)

> createOrReplaceTable does not always replace an existing table of the same 
> name
> ---
>
> Key: SPARK-19699
> URL: https://issues.apache.org/jira/browse/SPARK-19699
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Barry Becker
>Priority: Minor
>
> There are cases when dataframe.createOrReplaceTempView does not replace an 
> existing table with the same name.
> Please also refer to my [related stack-overflow 
> post|http://stackoverflow.com/questions/42371690/in-spark-2-1-how-come-the-dataframe-createoreplacetemptable-does-not-replace-an].
> To reproduce, do
> {code}
> df.collect()
> df.createOrReplaceTempView("foo1")
> df.sqlContext.cacheTable("foo1")
> {code}
> with one dataframe, and then do exactly the same thing with a different 
> dataframe. Then look in the storage tab in the spark UI and see multiple 
> entries for "foo1" in the "RDD Name" column.
> Maybe I am misunderstanding, but this causes 2 apparent problems
> 1) How do you know which table will be retrieved with 
> sqlContext.table("foo1") ?
> 2) The duplicate entries represent a memory leak. 
>   I have tried calling dropTempTable(existingName) first, but then have 
> occasionally seen a FAILFAST error when trying to use the table. It's as if 
> the dropTempTable is not synchronous, but maybe I am doing something wrong.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19699) createOrReplaceTable does not always replace an existing table of the same name

2017-02-24 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882385#comment-15882385
 ] 

Genmao Yu commented on SPARK-19699:
---

Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan]

!https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png!

> createOrReplaceTable does not always replace an existing table of the same 
> name
> ---
>
> Key: SPARK-19699
> URL: https://issues.apache.org/jira/browse/SPARK-19699
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Barry Becker
>Priority: Minor
>
> There are cases when dataframe.createOrReplaceTempView does not replace an 
> existing table with the same name.
> Please also refer to my [related stack-overflow 
> post|http://stackoverflow.com/questions/42371690/in-spark-2-1-how-come-the-dataframe-createoreplacetemptable-does-not-replace-an].
> To reproduce, do
> {code}
> df.collect()
> df.createOrReplaceTempView("foo1")
> df.sqlContext.cacheTable("foo1")
> {code}
> with one dataframe, and then do exactly the same thing with a different 
> dataframe. Then look in the storage tab in the spark UI and see multiple 
> entries for "foo1" in the "RDD Name" column.
> Maybe I am misunderstanding, but this causes 2 apparent problems
> 1) How do you know which table will be retrieved with 
> sqlContext.table("foo1") ?
> 2) The duplicate entries represent a memory leak. 
>   I have tried calling dropTempTable(existingName) first, but then have 
> occasionally seen a FAILFAST error when trying to use the table. It's as if 
> the dropTempTable is not synchronous, but maybe I am doing something wrong.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19699) createOrReplaceTable does not always replace an existing table of the same name

2017-02-24 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882385#comment-15882385
 ] 

Genmao Yu edited comment on SPARK-19699 at 2/24/17 10:16 AM:
-

Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan] What's 
your opinion?

!https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png!


was (Author: unclegen):
Good catch! Maybe we can add {{rdd.id}} or something else. [~cloud_fan]

!https://cloud.githubusercontent.com/assets/7402327/23299586/02f7b73a-fabd-11e6-8daf-321ca9ab5ed0.png!

> createOrReplaceTable does not always replace an existing table of the same 
> name
> ---
>
> Key: SPARK-19699
> URL: https://issues.apache.org/jira/browse/SPARK-19699
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Barry Becker
>Priority: Minor
>
> There are cases when dataframe.createOrReplaceTempView does not replace an 
> existing table with the same name.
> Please also refer to my [related stack-overflow 
> post|http://stackoverflow.com/questions/42371690/in-spark-2-1-how-come-the-dataframe-createoreplacetemptable-does-not-replace-an].
> To reproduce, do
> {code}
> df.collect()
> df.createOrReplaceTempView("foo1")
> df.sqlContext.cacheTable("foo1")
> {code}
> with one dataframe, and then do exactly the same thing with a different 
> dataframe. Then look in the storage tab in the spark UI and see multiple 
> entries for "foo1" in the "RDD Name" column.
> Maybe I am misunderstanding, but this causes 2 apparent problems
> 1) How do you know which table will be retrieved with 
> sqlContext.table("foo1") ?
> 2) The duplicate entries represent a memory leak. 
>   I have tried calling dropTempTable(existingName) first, but then have 
> occasionally seen a FAILFAST error when trying to use the table. It's as if 
> the dropTempTable is not synchronous, but maybe I am doing something wrong.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-19642) Improve the security guarantee for rest api and ui

2017-02-21 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu closed SPARK-19642.
-
Resolution: Won't Fix

> Improve the security guarantee for rest api and ui
> --
>
> Key: SPARK-19642
> URL: https://issues.apache.org/jira/browse/SPARK-19642
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>
> As Spark gets more and more features, data may start leaking through other 
> places (e.g. SQL query plans which are shown in the UI). Also current rest 
> api may be a security hole. Open this JIRA to research and address the 
> potential security flaws.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19676) Flaky test: FsHistoryProviderSuite.SPARK-3697: ignore directories that cannot be read.

2017-02-21 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19676:
-

 Summary: Flaky test: FsHistoryProviderSuite.SPARK-3697: ignore 
directories that cannot be read.
 Key: SPARK-19676
 URL: https://issues.apache.org/jira/browse/SPARK-19676
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19642) Improve the security guarantee for rest api and ui

2017-02-16 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-19642:
--
Summary: Improve the security guarantee for rest api and ui  (was: Improve 
the security guarantee for rest api)

> Improve the security guarantee for rest api and ui
> --
>
> Key: SPARK-19642
> URL: https://issues.apache.org/jira/browse/SPARK-19642
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>Priority: Critical
>
> As Spark gets more and more features, data may start leaking through other 
> places (e.g. SQL query plans which are shown in the UI). Also current rest 
> api may be a security hole. Open this JIRA to research and address the 
> potential security flaws.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19642) Improve the security guarantee for rest api

2017-02-16 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-19642:
--
Description: As Spark gets more and more features, data may start leaking 
through other places (e.g. SQL query plans which are shown in the UI). Also 
current rest api may be a security hole. Open this JIRA to research and address 
the potential security flaws.  (was: As Spark gets more and more features, data 
may start leaking through other places (e.g. SQL query plans which are shown in 
the UI). Also current rest api may be a security hole. Opening this JIRA to 
research and address the potential security flaws.)

> Improve the security guarantee for rest api
> ---
>
> Key: SPARK-19642
> URL: https://issues.apache.org/jira/browse/SPARK-19642
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>Priority: Critical
>
> As Spark gets more and more features, data may start leaking through other 
> places (e.g. SQL query plans which are shown in the UI). Also current rest 
> api may be a security hole. Open this JIRA to research and address the 
> potential security flaws.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19642) Improve the security guarantee for rest api

2017-02-16 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-19642:
--
Description: As Spark gets more and more features, data may start leaking 
through other places (e.g. SQL query plans which are shown in the UI). Also 
current rest api may be a security hole. Opening this JIRA to research and 
address the potential security flaws.  (was: As Spark gets more and more 
features, data may start leaking through other places (e.g. SQL query plans 
which are shown in the UI). And current rest api may be a security hole. 
Opening this JIRA to research and address the potential security flaws.)

> Improve the security guarantee for rest api
> ---
>
> Key: SPARK-19642
> URL: https://issues.apache.org/jira/browse/SPARK-19642
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>Priority: Critical
>
> As Spark gets more and more features, data may start leaking through other 
> places (e.g. SQL query plans which are shown in the UI). Also current rest 
> api may be a security hole. Opening this JIRA to research and address the 
> potential security flaws.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19642) Improve the security guarantee for rest api

2017-02-16 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871058#comment-15871058
 ] 

Genmao Yu commented on SPARK-19642:
---

cc [~ajbozarth], [~vanzin] and [~srowen]

> Improve the security guarantee for rest api
> ---
>
> Key: SPARK-19642
> URL: https://issues.apache.org/jira/browse/SPARK-19642
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>Priority: Critical
>
> As Spark gets more and more features, data may start leaking through other 
> places (e.g. SQL query plans which are shown in the UI). And current rest api 
> may be a security hole. Opening this JIRA to research and address the 
> potential security flaws.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19642) Improve the security guarantee for rest api

2017-02-16 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19642:
-

 Summary: Improve the security guarantee for rest api
 Key: SPARK-19642
 URL: https://issues.apache.org/jira/browse/SPARK-19642
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Critical


As Spark gets more and more features, data may start leaking through other 
places (e.g. SQL query plans which are shown in the UI). And current rest api 
may be a security hole. Opening this JIRA to research and address the potential 
security flaws.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19605) Fail it if existing resource is not enough to run streaming job

2017-02-14 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19605:
-

 Summary: Fail it if existing resource is not enough to run 
streaming job
 Key: SPARK-19605
 URL: https://issues.apache.org/jira/browse/SPARK-19605
 Project: Spark
  Issue Type: Improvement
  Components: DStreams
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19556) Broadcast data is not encrypted when I/O encryption is on

2017-02-14 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867152#comment-15867152
 ] 

Genmao Yu commented on SPARK-19556:
---

[~vanzin] I am working on this, could you please assign it to me?


> Broadcast data is not encrypted when I/O encryption is on
> -
>
> Key: SPARK-19556
> URL: https://issues.apache.org/jira/browse/SPARK-19556
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Marcelo Vanzin
>
> {{TorrentBroadcast}} uses a couple of "back doors" into the block manager to 
> write and read data:
> {code}
>   if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, 
> tellMaster = true)) {
> throw new SparkException(s"Failed to store $pieceId of $broadcastId 
> in local BlockManager")
>   }
> {code}
> {code}
>   bm.getLocalBytes(pieceId) match {
> case Some(block) =>
>   blocks(pid) = block
>   releaseLock(pieceId)
> case None =>
>   bm.getRemoteBytes(pieceId) match {
> case Some(b) =>
>   if (checksumEnabled) {
> val sum = calcChecksum(b.chunks(0))
> if (sum != checksums(pid)) {
>   throw new SparkException(s"corrupt remote block $pieceId of 
> $broadcastId:" +
> s" $sum != ${checksums(pid)}")
> }
>   }
>   // We found the block from remote executors/driver's 
> BlockManager, so put the block
>   // in this executor's BlockManager.
>   if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, 
> tellMaster = true)) {
> throw new SparkException(
>   s"Failed to store $pieceId of $broadcastId in local 
> BlockManager")
>   }
>   blocks(pid) = b
> case None =>
>   throw new SparkException(s"Failed to get $pieceId of 
> $broadcastId")
>   }
>   }
> {code}
> The thing these block manager methods have in common is that they bypass the 
> encryption code; so broadcast data is stored unencrypted in the block 
> manager, causing unencrypted data to be written to disk if those blocks need 
> to be evicted from memory.
> The correct fix here is actually not to change {{TorrentBroadcast}}, but to 
> fix the block manager so that:
> - data stored in memory is not encrypted
> - data written to disk is encrypted
> This would simplify the code paths that use BlockManager / SerializerManager 
> APIs (e.g. see SPARK-19520), but requires some tricky changes inside the 
> BlockManager to still be able to use file channels to avoid reading whole 
> blocks back into memory so they can be decrypted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19524) newFilesOnly does not work according to docs.

2017-02-08 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15858819#comment-15858819
 ] 

Genmao Yu commented on SPARK-19524:
---

Current implementation will clear the old time-to-files mappings based on the 
{{minRememberDurationS}}.

> newFilesOnly does not work according to docs. 
> --
>
> Key: SPARK-19524
> URL: https://issues.apache.org/jira/browse/SPARK-19524
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: Egor Pahomov
>
> Docs says:
> newFilesOnly
> Should process only new files and ignore existing files in the directory
> It's not working. 
> http://stackoverflow.com/questions/29852249/how-spark-streaming-identifies-new-files
>  says, that it shouldn't work as expected. 
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
>  not clear at all in terms, what code tries to do



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19482) Fail it if 'spark.master' is set with different value

2017-02-06 Thread Genmao Yu (JIRA)
Genmao Yu created SPARK-19482:
-

 Summary: Fail it if 'spark.master' is set with different value
 Key: SPARK-19482
 URL: https://issues.apache.org/jira/browse/SPARK-19482
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.1.0, 2.0.2
Reporter: Genmao Yu


First, there is no need to set 'spark.master' multi-times with different 
values. Second, It is possible for users to set the different 'spark.master' in 
code with
`spark-submit` command, and will confuse users. So, we should do once check if 
the 'spark.master' already exists in settings and if the previous value is the 
same with current value. Throw a IllegalArgumentException when previous value 
is different with current value.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19451) Long values in Window function

2017-02-06 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853773#comment-15853773
 ] 

Genmao Yu commented on SPARK-19451:
---

[~jchamp] I have taken a fast look through the code, and did not find any 
strong point to set the type of index as Int. In the window function, there is 
really a underlying integer overflow issue. I have make a pull request 
(https://github.com/apache/spark/pull/16818), and any suggestion is appreciated.

> Long values in Window function
> --
>
> Key: SPARK-19451
> URL: https://issues.apache.org/jira/browse/SPARK-19451
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1, 2.0.2
>Reporter: Julien Champ
>
> Hi there,
> there seems to be a major limitation in spark window functions and 
> rangeBetween method.
> If I have the following code :
> {code:title=Exemple |borderStyle=solid}
> val tw =  Window.orderBy("date")
>   .partitionBy("id")
>   .rangeBetween( from , 0)
> {code}
> Everything seems ok, while *from* value is not too large... Even if the 
> rangeBetween() method supports Long parameters.
> But If i set *-216000L* value to *from* it does not work !
> It is probably related to this part of code in the between() method, of the 
> WindowSpec class, called by rangeBetween()
> {code:title=between() method|borderStyle=solid}
> val boundaryStart = start match {
>   case 0 => CurrentRow
>   case Long.MinValue => UnboundedPreceding
>   case x if x < 0 => ValuePreceding(-start.toInt)
>   case x if x > 0 => ValueFollowing(start.toInt)
> }
> {code}
> ( look at this *.toInt* )
> Does anybody know it there's a way to solve / patch this behavior ?
> Any help will be appreciated
> Thx



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19451) Long values in Window function

2017-02-06 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853773#comment-15853773
 ] 

Genmao Yu edited comment on SPARK-19451 at 2/6/17 9:58 AM:
---

[~jchamp] I have taken a fast look through the code, and did not find any 
strong point to set the type of index as Int. In the window function, there is 
really a underlying integer overflow issue. I  made a pull request 
(https://github.com/apache/spark/pull/16818), and any suggestion is appreciated.


was (Author: unclegen):
[~jchamp] I have taken a fast look through the code, and did not find any 
strong point to set the type of index as Int. In the window function, there is 
really a underlying integer overflow issue. I have make a pull request 
(https://github.com/apache/spark/pull/16818), and any suggestion is appreciated.

> Long values in Window function
> --
>
> Key: SPARK-19451
> URL: https://issues.apache.org/jira/browse/SPARK-19451
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1, 2.0.2
>Reporter: Julien Champ
>
> Hi there,
> there seems to be a major limitation in spark window functions and 
> rangeBetween method.
> If I have the following code :
> {code:title=Exemple |borderStyle=solid}
> val tw =  Window.orderBy("date")
>   .partitionBy("id")
>   .rangeBetween( from , 0)
> {code}
> Everything seems ok, while *from* value is not too large... Even if the 
> rangeBetween() method supports Long parameters.
> But If i set *-216000L* value to *from* it does not work !
> It is probably related to this part of code in the between() method, of the 
> WindowSpec class, called by rangeBetween()
> {code:title=between() method|borderStyle=solid}
> val boundaryStart = start match {
>   case 0 => CurrentRow
>   case Long.MinValue => UnboundedPreceding
>   case x if x < 0 => ValuePreceding(-start.toInt)
>   case x if x > 0 => ValueFollowing(start.toInt)
> }
> {code}
> ( look at this *.toInt* )
> Does anybody know it there's a way to solve / patch this behavior ?
> Any help will be appreciated
> Thx



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19451) Long values in Window function

2017-02-05 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853434#comment-15853434
 ] 

Genmao Yu commented on SPARK-19451:
---

Good catch! I will dig deeply into code and fix it if it is really a bug.

> Long values in Window function
> --
>
> Key: SPARK-19451
> URL: https://issues.apache.org/jira/browse/SPARK-19451
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1, 2.0.2
>Reporter: Julien Champ
>
> Hi there,
> there seems to be a major limitation in spark window functions and 
> rangeBetween method.
> If I have the following code :
> {code:title=Exemple |borderStyle=solid}
> val tw =  Window.orderBy("date")
>   .partitionBy("id")
>   .rangeBetween( from , 0)
> {code}
> Everything seems ok, while *from* value is not too large... Even if the 
> rangeBetween() method supports Long parameters.
> But If i set *-216000L* value to *from* it does not work !
> It is probably related to this part of code in the between() method, of the 
> WindowSpec class, called by rangeBetween()
> {code:title=between() method|borderStyle=solid}
> val boundaryStart = start match {
>   case 0 => CurrentRow
>   case Long.MinValue => UnboundedPreceding
>   case x if x < 0 => ValuePreceding(-start.toInt)
>   case x if x > 0 => ValueFollowing(start.toInt)
> }
> {code}
> ( look at this *.toInt* )
> Does anybody know it there's a way to solve / patch this behavior ?
> Any help will be appreciated
> Thx



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19407) defaultFS is used FileSystem.get instead of getting it from uri scheme

2017-02-05 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853426#comment-15853426
 ] 

Genmao Yu commented on SPARK-19407:
---

[~aassudani] Are you still working on this? As this issue is clear and easy to 
fix, I will making a pr later if it is busy for you to work on this.

> defaultFS is used FileSystem.get instead of getting it from uri scheme
> --
>
> Key: SPARK-19407
> URL: https://issues.apache.org/jira/browse/SPARK-19407
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Amit Assudani
>  Labels: checkpoint, filesystem, starter, streaming
>
> Caused by: java.lang.IllegalArgumentException: Wrong FS: 
> s3a://**/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata,
>  expected: file:///
>   at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>   at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>   at 
> org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:100)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
> Can easily replicate on spark standalone cluster by providing checkpoint 
> location uri scheme anything other than "file://" and not overriding in 
> config.
> WorkAround  --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in 
> sparkConf or spark-default.conf



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19147) netty throw NPE

2017-01-24 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837333#comment-15837333
 ] 

Genmao Yu commented on SPARK-19147:
---

After dig into code, this issue may occurs when executor is existing and at the 
same time some tasks are running. There are some scenes in which this NPE will 
be thrown:

1. AM re-registers after a failure, and this will resets the state of 
CoarseGrainedSchedulerBackend to the initial state, then old executors will be 
removed. This will only occurs in yarn-client mode.
2. Master may will remove executor when a work is lost, and at the same time 
some tasks may be running.
3. Some other scenes i do not know.

As for as I know, this NPE harms nothing, but may confuse users. Maybe, we 
should improve this exception message to tell users it is safe. What is your 
opinion? [~zsxwing]

> netty throw NPE
> ---
>
> Key: SPARK-19147
> URL: https://issues.apache.org/jira/browse/SPARK-19147
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: cen yuhai
>
> {code}
> 17/01/10 19:17:20 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
> from bigdata-hdp-apache1828.xg01.diditaxi.com:7337
> java.lang.NullPointerException: group
>   at io.netty.bootstrap.AbstractBootstrap.group(AbstractBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:203)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:354)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
>   at 
> org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:138)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
>   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
>   at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>   at 
>

[jira] [Comment Edited] (SPARK-19147) netty throw NPE

2017-01-24 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837333#comment-15837333
 ] 

Genmao Yu edited comment on SPARK-19147 at 1/25/17 7:39 AM:


After dig into code, this issue may occurs when executor is existing and at the 
same time some tasks are running. There are some scenes in which this NPE will 
be thrown:

1. AM re-registers after a failure, and this will resets the state of 
CoarseGrainedSchedulerBackend to the initial state, then old executors will be 
removed. This will only occurs in yarn-client mode.
2. Master may will remove executor when a work is lost, and at the same time 
some tasks may be running.
3. Some other scenes i do not know.

As far as I know, this NPE harms nothing, but may confuse users. Maybe, we 
should improve this exception message to tell users it is safe. What is your 
opinion? [~zsxwing]


was (Author: unclegen):
After dig into code, this issue may occurs when executor is existing and at the 
same time some tasks are running. There are some scenes in which this NPE will 
be thrown:

1. AM re-registers after a failure, and this will resets the state of 
CoarseGrainedSchedulerBackend to the initial state, then old executors will be 
removed. This will only occurs in yarn-client mode.
2. Master may will remove executor when a work is lost, and at the same time 
some tasks may be running.
3. Some other scenes i do not know.

As for as I know, this NPE harms nothing, but may confuse users. Maybe, we 
should improve this exception message to tell users it is safe. What is your 
opinion? [~zsxwing]

> netty throw NPE
> ---
>
> Key: SPARK-19147
> URL: https://issues.apache.org/jira/browse/SPARK-19147
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: cen yuhai
>
> {code}
> 17/01/10 19:17:20 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
> from bigdata-hdp-apache1828.xg01.diditaxi.com:7337
> java.lang.NullPointerException: group
>   at io.netty.bootstrap.AbstractBootstrap.group(AbstractBootstrap.java:80)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:203)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:181)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:354)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
>   at 
> org.apache.spark.sql.execution.columnar.InMemoryRelation$$

[jira] [Commented] (SPARK-19354) Killed tasks are getting marked as FAILED

2017-01-24 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837048#comment-15837048
 ] 

Genmao Yu commented on SPARK-19354:
---

IMHO, the killed tasks will be failed finally, so there is no strong point to 
separate Killed and Failed.

> Killed tasks are getting marked as FAILED
> -
>
> Key: SPARK-19354
> URL: https://issues.apache.org/jira/browse/SPARK-19354
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Reporter: Devaraj K
>Priority: Minor
>
> When we enable speculation, we can see there are multiple attempts running 
> for the same task when the first task progress is slow. If any of the task 
> attempt succeeds then the other attempts will be killed, during killing the 
> attempts those attempts are getting marked as failed due to the below error. 
> We need to handle this error and mark the attempt as KILLED instead of FAILED.
> ||93  ||214   ||1 (speculative)   ||FAILED||ANY   ||1 / 
> xx.xx.xx.x2
> stdout
> stderr
> ||2017/01/24 10:30:44 ||0.2 s ||0.0 B / 0 ||8.0 KB / 400  
> ||java.io.IOException: Failed on local exception: 
> java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
> "node2/xx.xx.xx.x2"; destination host is: "node1":9000; 
> +details||
> {code:xml}
> 17/01/23 23:54:32 INFO Executor: Executor is trying to kill task 93.1 in 
> stage 1.0 (TID 214)
> 17/01/23 23:54:32 INFO FileOutputCommitter: File Output Committer Algorithm 
> version is 1
> 17/01/23 23:54:32 ERROR Executor: Exception in task 93.1 in stage 1.0 (TID 
> 214)
> java.io.IOException: Failed on local exception: 
> java.nio.channels.ClosedByInterruptException; Host Details : local host is: 
> "stobdtserver3/10.224.54.70"; destination host is: "stobdtserver2":9000; 
>   at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>   at com.sun.proxy.$Proxy17.create(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy18.create(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648)
>   at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
>   at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
>   at 
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
>   at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1133)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1124)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
>   at org.apache.spark.scheduler.Task.run(Task.scala:114)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedByInterruptException
>   at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibl

[jira] [Commented] (SPARK-10141) Number of tasks on executors still become negative after failures

2017-01-24 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837043#comment-15837043
 ] 

Genmao Yu commented on SPARK-10141:
---

I think this is fix in https://github.com/apache/spark/pull/14969, and the fix 
version is Spark 2.0.0

> Number of tasks on executors still become negative after failures
> -
>
> Key: SPARK-10141
> URL: https://issues.apache.org/jira/browse/SPARK-10141
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 1.5.0
>Reporter: Joseph K. Bradley
>Priority: Minor
> Attachments: Screen Shot 2015-08-20 at 3.14.49 PM.png
>
>
> I hit this failure when running LDA on EC2 (after I made the model size 
> really big).
> I was using the LDAExample.scala code on an EC2 cluster with 16 workers 
> (r3.2xlarge), on a Wikipedia dataset:
> {code}
> Training set size (documents) 4534059
> Vocabulary size (terms)   1
> Training set size (tokens)895575317
> EM optimizer
> 1K topics
> {code}
> Failure message:
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 55 in 
> stage 22.0 failed 4 times, most recent failure: Lost task 55.3 in stage 22.0 
> (TID 2881, 10.0.202.128): java.io.IOException: Failed to connect to 
> /10.0.202.128:54740
> at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: /10.0.202.128:54740
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>   at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   ... 1 more
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1267)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1255)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1254)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1254)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:684)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1480)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1442)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1431)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.

[jira] [Commented] (SPARK-19356) Number of active tasks is negative even when there is no failed executor

2017-01-24 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837041#comment-15837041
 ] 

Genmao Yu commented on SPARK-19356:
---

I think this is fix in https://github.com/apache/spark/pull/14969, and the fix 
version is Spark 2.0.0

> Number of active tasks is negative even when there is no failed executor
> 
>
> Key: SPARK-19356
> URL: https://issues.apache.org/jira/browse/SPARK-19356
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 1.6.0
>Reporter: Lan Jiang
>Priority: Minor
> Attachments: Screen Shot 2017-01-24 at 4.39.09 PM.png
>
>
> The active tasks number in the UI is negative, even when there is no failed 
> executor. It is different from 
> https://issues.apache.org/jira/browse/SPARK-10141, which relates to failed 
> executor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-18563) mapWithState: initialState should have a timeout setting per record

2017-01-24 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-18563:
--
Comment: was deleted

(was: I do not know is there any plan to add new feature to DStreams? Maybe, we 
should focus on Structured Streaming?

[~zsxwing])

> mapWithState: initialState should have a timeout setting per record
> ---
>
> Key: SPARK-18563
> URL: https://issues.apache.org/jira/browse/SPARK-18563
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Reporter: Daniel Haviv
>
> when passing an initialState for mapWithState there should a possibility to 
> set a timeout at the record level.
> If for example mapWithState is configured with a 48H timeout, loading an 
> initialState will cause the state to bloat and hold 96H of data and then 
> release 48H of data at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-19343) Do once optimistic checkpoint before stop

2017-01-24 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu closed SPARK-19343.
-
Resolution: Won't Fix

> Do once optimistic checkpoint before stop
> -
>
> Key: SPARK-19343
> URL: https://issues.apache.org/jira/browse/SPARK-19343
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Genmao Yu
>
> Streaming job restarts from checkpoint, and it will rebuild several batch 
> until finding latest checkpointed RDD. So we can do once optimistic 
> checkpoint just before stop, so that reducing unnecessary recomputation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18563) mapWithState: initialState should have a timeout setting per record

2017-01-22 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833416#comment-15833416
 ] 

Genmao Yu edited comment on SPARK-18563 at 1/22/17 9:35 AM:


I do not know is there any plan to add new feature to DStreams? Maybe, we 
should focus on Structured Streaming?

[~zsxwing]


was (Author: unclegen):
I do not know is there any plan to add new feature to DStreams? Maybe, we 
should focus on Structured Streaming?

> mapWithState: initialState should have a timeout setting per record
> ---
>
> Key: SPARK-18563
> URL: https://issues.apache.org/jira/browse/SPARK-18563
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Reporter: Daniel Haviv
>
> when passing an initialState for mapWithState there should a possibility to 
> set a timeout at the record level.
> If for example mapWithState is configured with a 48H timeout, loading an 
> initialState will cause the state to bloat and hold 96H of data and then 
> release 48H of data at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18563) mapWithState: initialState should have a timeout setting per record

2017-01-22 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833416#comment-15833416
 ] 

Genmao Yu commented on SPARK-18563:
---

I do not know is there any plan to add new feature to DStreams? Maybe, we 
should focus on Structured Streaming?

> mapWithState: initialState should have a timeout setting per record
> ---
>
> Key: SPARK-18563
> URL: https://issues.apache.org/jira/browse/SPARK-18563
> Project: Spark
>  Issue Type: Improvement
>  Components: DStreams
>Reporter: Daniel Haviv
>
> when passing an initialState for mapWithState there should a possibility to 
> set a timeout at the record level.
> If for example mapWithState is configured with a 48H timeout, loading an 
> initialState will cause the state to bloat and hold 96H of data and then 
> release 48H of data at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18839) Executor is active on web, but actually is dead

2017-01-22 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833409#comment-15833409
 ] 

Genmao Yu commented on SPARK-18839:
---

Sorry, I do not think this is a bug.

> Executor is active on web, but actually is dead
> ---
>
> Key: SPARK-18839
> URL: https://issues.apache.org/jira/browse/SPARK-18839
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: meiyoula
>Priority: Minor
>
> When a container is preempted, AM find it is completed, driver removes the 
> blockmanager. But executor actually dead after a few seconds, during this 
> period, it updates blocks, and re-register the blockmanager. so the exeutors 
> page show the executor is active.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18805) InternalMapWithStateDStream make java.lang.StackOverflowError

2017-01-22 Thread Genmao Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833377#comment-15833377
 ] 

Genmao Yu commented on SPARK-18805:
---

+ 1 to {{That should be not an infinite loop. The time is different on each 
call.}}

And, there is really a potential {{StackOverflowError}} issue, but it is hard 
to arise according to my understanding. Could you please provide your setting 
of checkpointDuration and batchDuration?

> InternalMapWithStateDStream make java.lang.StackOverflowError 
> --
>
> Key: SPARK-18805
> URL: https://issues.apache.org/jira/browse/SPARK-18805
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.6.3, 2.0.2
> Environment: mesos
>Reporter: etienne
>
> When load InternalMapWithStateDStream from a check point.
> If isValidTime is true and if there is no generatedRDD at the given time 
> there is an infinite loop.
> 1) compute is call on InternalMapWithStateDStream
> 2) InternalMapWithStateDStream try to generate the previousRDD
> 3) Stream look in generatedRDD if the RDD is already generated for the given 
> time 
> 4) It not fund the rdd so it check if the time is valid.
> 5) if the time is valid call compute on InternalMapWithStateDStream
> 6) restart from 1)
> Here the exception that illustrate this error
> {code}
> Exception in thread "streaming-start" java.lang.StackOverflowError
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>   at 
> org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
>   at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18116) spark streaming ui show 0 events when recovering from checkpoint

2017-01-20 Thread Genmao Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Genmao Yu updated SPARK-18116:
--
Target Version/s:   (was: 2.0.3, 2.1.1)
   Fix Version/s: (was: 2.1.1)
  (was: 2.0.3)

> spark streaming ui show 0 events when recovering from checkpoint
> 
>
> Key: SPARK-18116
> URL: https://issues.apache.org/jira/browse/SPARK-18116
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.6.1
> Environment: jdk1.8.0_77  Red Hat 4.4.7-11
>Reporter: liujianhui
>Priority: Minor
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> run a streaming application which souce from kafka.There are many batchs 
> queued in the job list before application stopped, and then stop the 
> application, as follow starting it from checkpointed file, in the spark ui, 
> the size of the queued batchs which stored in the checkpoint file are 0
> Batch TimeInput Size  Scheduling Delay (?)Processing Time (?) 
> Total Delay (?) Output Ops: Succeeded/Total
> 2016/10/26 19:02:15   0 events1.1 h   13 s1.1 h 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >