[jira] [Commented] (SPARK-13510) Shuffle may throw FetchFailedException: Direct buffer memory

2019-04-25 Thread Mike Chan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-13510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825798#comment-16825798
 ] 

Mike Chan commented on SPARK-13510:
---

Thanks man

> Shuffle may throw FetchFailedException: Direct buffer memory
> 
>
> Key: SPARK-13510
> URL: https://issues.apache.org/jira/browse/SPARK-13510
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Hong Shen
>Priority: Major
> Attachments: spark-13510.diff
>
>
> In our cluster, when I test spark-1.6.0 with a sql, it throw exception and 
> failed.
> {code}
> 16/02/17 15:36:03 INFO storage.ShuffleBlockFetcherIterator: Sending request 
> for 1 blocks (915.4 MB) from 10.196.134.220:7337
> 16/02/17 15:36:03 INFO shuffle.ExternalShuffleClient: External shuffle fetch 
> from 10.196.134.220:7337 (executor id 122)
> 16/02/17 15:36:03 INFO client.TransportClient: Sending fetch chunk request 0 
> to /10.196.134.220:7337
> 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in 
> connection from /10.196.134.220:7337
> java.lang.OutOfMemoryError: Direct buffer memory
>   at java.nio.Bits.reserveMemory(Bits.java:658)
>   at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>   at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>   at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)
>   at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)
>   at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)
>   at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
>   at 
> io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
>   at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
>   at 
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
>   at 
> io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
>   at 
> io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:744)
> 16/02/17 15:36:36 ERROR client.TransportResponseHandler: Still have 1 
> requests outstanding when connection from /10.196.134.220:7337 is closed
> 16/02/17 15:36:36 ERROR shuffle.RetryingBlockFetcher: Failed to fetch block 
> shuffle_3_81_2, and will not retry (0 retries)
> {code}
>   The reason is that when shuffle a big block(like 1G), task will allocate 
> the same memory, it will easily throw "FetchFailedException: Direct buffer 
> memory".
>   If I add -Dio.netty.noUnsafe=true spark.executor.extraJavaOptions, it will 
> throw 
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> io.netty.buffer.PoolArena$HeapArena.newUnpooledChunk(PoolArena.java:607)
> at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
> {code}
>   
>   In mapreduce shuffle, it will firstly judge whether the block can cache in 
> memery, but spark doesn't. 
>   If the block is more than we can cache in memory, we  should write to disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27557) Add copybutton to spark Python API docs for easier copying of code-blocks

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27557:


Assignee: Apache Spark

> Add copybutton to spark Python API docs for easier copying of code-blocks
> -
>
> Key: SPARK-27557
> URL: https://issues.apache.org/jira/browse/SPARK-27557
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, PySpark
>Affects Versions: 2.4.2
>Reporter: Sangram G
>Assignee: Apache Spark
>Priority: Minor
>  Labels: Documentation, PySpark
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> Add a non-intrusive button for python API documentation, which will remove 
> ">>>" prompts and outputs of code - for easier copying of code.
> For example: The below code-snippet in the document is difficult to copy due 
> to ">>>" prompts
> {code}
> >>> l = [('Alice', 1)]
> >>> spark.createDataFrame(l).collect()
> [Row(_1='Alice', _2=1)]
> {code}
> Becomes this - After the copybutton in the corner of of code-block is pressed 
> - which is easier to copy 
> {code}
> l = [('Alice', 1)]
> spark.createDataFrame(l).collect()
> {code}
> Sample Screenshot for reference: 
> [https://screenshots.firefox.com/ZfBXZrpINKdt9QZg/www269.lunapic.com]
> This can be easily done only by adding a copybutton.js script in 
> python/docs/_static folder and calling it at setup time from 
> python/docs/conf.py.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27557) Add copybutton to spark Python API docs for easier copying of code-blocks

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27557:


Assignee: (was: Apache Spark)

> Add copybutton to spark Python API docs for easier copying of code-blocks
> -
>
> Key: SPARK-27557
> URL: https://issues.apache.org/jira/browse/SPARK-27557
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, PySpark
>Affects Versions: 2.4.2
>Reporter: Sangram G
>Priority: Minor
>  Labels: Documentation, PySpark
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> Add a non-intrusive button for python API documentation, which will remove 
> ">>>" prompts and outputs of code - for easier copying of code.
> For example: The below code-snippet in the document is difficult to copy due 
> to ">>>" prompts
> {code}
> >>> l = [('Alice', 1)]
> >>> spark.createDataFrame(l).collect()
> [Row(_1='Alice', _2=1)]
> {code}
> Becomes this - After the copybutton in the corner of of code-block is pressed 
> - which is easier to copy 
> {code}
> l = [('Alice', 1)]
> spark.createDataFrame(l).collect()
> {code}
> Sample Screenshot for reference: 
> [https://screenshots.firefox.com/ZfBXZrpINKdt9QZg/www269.lunapic.com]
> This can be easily done only by adding a copybutton.js script in 
> python/docs/_static folder and calling it at setup time from 
> python/docs/conf.py.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27491) SPARK REST API - "org.apache.spark.deploy.SparkSubmit --status" returns empty response! therefore Airflow won't integrate with Spark 2.3.x

2019-04-25 Thread t oo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825804#comment-16825804
 ] 

t oo commented on SPARK-27491:
--

my current workaround to get airflow to integrate with spark2.3 cluster is to 
use 2 versions of spark on the airflow machines (spark2.3 to submit and 
spark2.1 to poll status) :) had to overwrite airflow's spark_submit_operator to 
point to 2 paths of spark

> SPARK REST API - "org.apache.spark.deploy.SparkSubmit --status" returns empty 
> response! therefore Airflow won't integrate with Spark 2.3.x
> --
>
> Key: SPARK-27491
> URL: https://issues.apache.org/jira/browse/SPARK-27491
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, Scheduler, Spark Core, Spark Shell, Spark 
> Submit
>Affects Versions: 2.3.3
>Reporter: t oo
>Priority: Major
>
> This issue must have been introduced after Spark 2.1.1 as it is working in 
> that version. This issue is affecting me in Spark 2.3.3/2.3.0. I am using 
> spark standalone mode if that makes a difference.
> See below spark 2.3.3 returns empty response while 2.1.1 returns a response.
>  
> Spark 2.1.1:
> [ec2here@ip-x-y-160-225 ~]$ bash -x /home/ec2here/spark_home1/bin/spark-class 
> org.apache.spark.deploy.SparkSubmit --master spark://domainhere:6066 --status 
> driver-20190417130324-0009
> + export SPARK_HOME=/home/ec2here/spark_home1
> + SPARK_HOME=/home/ec2here/spark_home1
> + '[' -z /home/ec2here/spark_home1 ']'
> + . /home/ec2here/spark_home1/bin/load-spark-env.sh
> ++ '[' -z /home/ec2here/spark_home1 ']'
> ++ '[' -z '' ']'
> ++ export SPARK_ENV_LOADED=1
> ++ SPARK_ENV_LOADED=1
> ++ parent_dir=/home/ec2here/spark_home1
> ++ user_conf_dir=/home/ec2here/spark_home1/conf
> ++ '[' -f /home/ec2here/spark_home1/conf/spark-env.sh ']'
> ++ set -a
> ++ . /home/ec2here/spark_home1/conf/spark-env.sh
> +++ export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk.x86_64
> +++ JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk.x86_64
>  ulimit -n 1048576
> ++ set +a
> ++ '[' -z '' ']'
> ++ ASSEMBLY_DIR2=/home/ec2here/spark_home1/assembly/target/scala-2.11
> ++ ASSEMBLY_DIR1=/home/ec2here/spark_home1/assembly/target/scala-2.10
> ++ [[ -d /home/ec2here/spark_home1/assembly/target/scala-2.11 ]]
> ++ '[' -d /home/ec2here/spark_home1/assembly/target/scala-2.11 ']'
> ++ export SPARK_SCALA_VERSION=2.10
> ++ SPARK_SCALA_VERSION=2.10
> + '[' -n /usr/lib/jvm/jre-1.8.0-openjdk.x86_64 ']'
> + RUNNER=/usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java
> + '[' -d /home/ec2here/spark_home1/jars ']'
> + SPARK_JARS_DIR=/home/ec2here/spark_home1/jars
> + '[' '!' -d /home/ec2here/spark_home1/jars ']'
> + LAUNCH_CLASSPATH='/home/ec2here/spark_home1/jars/*'
> + '[' -n '' ']'
> + [[ -n '' ]]
> + CMD=()
> + IFS=
> + read -d '' -r ARG
> ++ build_command org.apache.spark.deploy.SparkSubmit --master 
> spark://domainhere:6066 --status driver-20190417130324-0009
> ++ /usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java -Xmx128m -cp 
> '/home/ec2here/spark_home1/jars/*' org.apache.spark.launcher.Main 
> org.apache.spark.deploy.SparkSubmit --master spark://domainhere:6066 --status 
> driver-20190417130324-0009
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> ++ printf '%d\0' 0
> + CMD+=("$ARG")
> + IFS=
> + read -d '' -r ARG
> + COUNT=10
> + LAST=9
> + LAUNCHER_EXIT_CODE=0
> + [[ 0 =~ ^[0-9]+$ ]]
> + '[' 0 '!=' 0 ']'
> + CMD=("${CMD[@]:0:$LAST}")
> + exec /usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java -cp 
> '/home/ec2here/spark_home1/conf/:/home/ec2here/spark_home1/jars/*' -Xmx2048m 
> org.apache.spark.deploy.SparkSubmit --master spark://domainhere:6066 --status 
> driver-20190417130324-0009
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/04/17 14:03:27 INFO RestSubmissionClient: Submitting a request for the 
> status of submission driver-20190417130324-0009 in spark://domainhere:6066.
> 19/04/17 14:03:28 INFO RestSubmissionClient: Server responded with 
> SubmissionStatusResponse:
> {
>  "action" : "SubmissionStatusResponse",
>  "driverState" : "FAILED",
>  "serverSparkVersion" : "2.3.3",
>  "submissionId" : "driver-20190417130324-0009",
>  "success" : true,
>  "workerHostPort" : "x.y.211.40:11819",
>  "workerId" : "worker-20190417115840-x.y.211.40-11819"
> }
> [ec2here@ip-x-y-160-225 ~]$
>  
> Spark 2.3.3:
> [ec2here@ip-x-y-160-225 ~]

[jira] [Assigned] (SPARK-27563) automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27563:


Assignee: Apache Spark  (was: Wenchen Fan)

> automatically get the latest Spark versions in 
> HiveExternalCatalogVersionsSuite
> ---
>
> Key: SPARK-27563
> URL: https://issues.apache.org/jira/browse/SPARK-27563
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27563) automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27563:


Assignee: Wenchen Fan  (was: Apache Spark)

> automatically get the latest Spark versions in 
> HiveExternalCatalogVersionsSuite
> ---
>
> Key: SPARK-27563
> URL: https://issues.apache.org/jira/browse/SPARK-27563
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26268) Decouple shuffle data from Spark deployment

2019-04-25 Thread Chenzhao Guo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825814#comment-16825814
 ] 

Chenzhao Guo commented on SPARK-26268:
--

Actually this can be resolved in SPARK-25299, there is no need to see this as a 
seperate issue.
We can just modify the line `val fileLost = ...` in DAGScheduler to consider 
remote shuffling as well. 

> Decouple shuffle data from Spark deployment
> ---
>
> Key: SPARK-26268
> URL: https://issues.apache.org/jira/browse/SPARK-26268
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.4.0
>Reporter: Ben Sidhom
>Priority: Major
>
> Right now the batch scheduler assumes that shuffle data is tied to executors. 
> As a result, when an executor is lost, any map tasks that ran on that 
> executor are rescheduled unless the "external" shuffle service is being used. 
> Note that this service is only external in the sense that it does not live 
> within executors themselves; its implementation cannot be swapped out and it 
> is assumed to speak the BlockManager language.
> The following changes would facilitate external shuffle (see SPARK-25299 for 
> motivation):
>  * Do not rerun map tasks on lost executors when shuffle data is stored 
> externally. For example, this could be determined by a property or by an 
> additional method that all ShuffleManagers implement.
>  * Do not assume that shuffle data is stored in the standard BlockManager 
> format or that a BlockManager is or must be available to ShuffleManagers.
> Note that only the first change is actually required to realize the benefits 
> of remote shuffle implementations as a phony (or null) BlockManager can be 
> used by shuffle implementations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27331) Schema mismatch using MicroBatchReader with columns pruning

2019-04-25 Thread Kineret (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825874#comment-16825874
 ] 

Kineret commented on SPARK-27331:
-

set the schema to be the full schema after every commit solved the issue. 
Thanks!

> Schema mismatch using MicroBatchReader with columns pruning
> ---
>
> Key: SPARK-27331
> URL: https://issues.apache.org/jira/browse/SPARK-27331
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
> Environment: spark 2.3.1
>Reporter: Kineret
>Priority: Major
>  Labels: datasource
>
> I'm writing a custom Spark streaming source. I want to support columns 
> pruning, I did something like this:
>  
> {code:java}
> class MyMicroBatchReader(...) extends MicroBatchReader with 
> SupportsPushDownRequiredColumns {
>   var schema: StructType = createSchema()
>   def readSchema(): StructType = schema
>   def pruneColumns(requiredSchema: StructType): Unit = {
> schema = requiredSchema
>   }
>   ...
> }
> {code}
>  
> if I run a streaming query selecting some columns, the job fails. For 
> example, running:
> {code:java}
> spark.readStream().format("mysource").load().select("Id").writeStream().format("console").start()
> {code}
> I obtain the following exception (in the second iteration):
> {code:java}
> 18/06/29 15:50:01 ERROR MicroBatchExecution: Query [id = 
> 59c13195-9d63-42c9-8f92-eb9d67e8b26c, runId = 
> 72124019-1ab3-48a9-9503-0cf1c7d26fb9] terminated with error 
> java.lang.AssertionError: assertion failed: Invalid batch: 
> fieldA#0,fieldB#1,fieldC,Id#3,fieldD#4,fieldE#5 != Id#52 at 
> scala.Predef$.assert(Predef.scala:170) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:417)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:416)
>  at scala.Option.map(Option.scala:146) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:416)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:414)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:414)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> {code}
> Can you ple

[jira] [Issue Comment Deleted] (SPARK-26688) Provide configuration of initially blacklisted YARN nodes

2019-04-25 Thread Sergey (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey updated SPARK-26688:
---
Comment: was deleted

(was: Hi There!

I'm very glad that the community paid attention to my question. Let me try to 
explain usecase

There is 1K nodes cluster and jobs have performance degradation because of a 
single node. It's rather hard to convince Cluster Ops to decommission node 
because of "performance degradation". Imagine 10 dev teams chase single ops 
team for valid reason (node has problems) or because code has a bug or data is 
skewed or spots on the sun. We can't just decommission node because random dev 
complains. 

Simple solution:
 * rerun failed / delayed job and blacklist "problematic" node in advance.
 * Report about the problem if job works w/o anomalies. 
 * ops collect complains about node and start to decommission it when 
"complains threshold" is reached. It's a rather low probability that many 
loosely coupled teams with loosely coupled jobs complain about a single node. 

Results
 * Ops are not spammed with a random requests from devs
 * devs are not blocked because of the really bad node.
 * it's very cheap for everyone to "blacklist" node during job submission w/o 
doing anything to node. )

> Provide configuration of initially blacklisted YARN nodes
> -
>
> Key: SPARK-26688
> URL: https://issues.apache.org/jira/browse/SPARK-26688
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Attila Zsolt Piros
>Assignee: Attila Zsolt Piros
>Priority: Major
> Fix For: 3.0.0
>
>
> Introducing new config for initially blacklisted YARN nodes.
> This came up in the apache spark user mailing list: 
> [http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-is-it-possible-to-manually-blacklist-nodes-before-running-spark-job-td34395.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27331) Schema mismatch using MicroBatchReader with columns pruning

2019-04-25 Thread Kineret (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret resolved SPARK-27331.
-
Resolution: Invalid

> Schema mismatch using MicroBatchReader with columns pruning
> ---
>
> Key: SPARK-27331
> URL: https://issues.apache.org/jira/browse/SPARK-27331
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
> Environment: spark 2.3.1
>Reporter: Kineret
>Priority: Major
>  Labels: datasource
>
> I'm writing a custom Spark streaming source. I want to support columns 
> pruning, I did something like this:
>  
> {code:java}
> class MyMicroBatchReader(...) extends MicroBatchReader with 
> SupportsPushDownRequiredColumns {
>   var schema: StructType = createSchema()
>   def readSchema(): StructType = schema
>   def pruneColumns(requiredSchema: StructType): Unit = {
> schema = requiredSchema
>   }
>   ...
> }
> {code}
>  
> if I run a streaming query selecting some columns, the job fails. For 
> example, running:
> {code:java}
> spark.readStream().format("mysource").load().select("Id").writeStream().format("console").start()
> {code}
> I obtain the following exception (in the second iteration):
> {code:java}
> 18/06/29 15:50:01 ERROR MicroBatchExecution: Query [id = 
> 59c13195-9d63-42c9-8f92-eb9d67e8b26c, runId = 
> 72124019-1ab3-48a9-9503-0cf1c7d26fb9] terminated with error 
> java.lang.AssertionError: assertion failed: Invalid batch: 
> fieldA#0,fieldB#1,fieldC,Id#3,fieldD#4,fieldE#5 != Id#52 at 
> scala.Predef$.assert(Predef.scala:170) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:417)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:416)
>  at scala.Option.map(Option.scala:146) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:416)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:414)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) 
> at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:414)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> {code}
> Can you please help?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---

[jira] [Issue Comment Deleted] (SPARK-26688) Provide configuration of initially blacklisted YARN nodes

2019-04-25 Thread Sergey (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey updated SPARK-26688:
---
Comment: was deleted

(was: Hi Imran,

thanks for you reply.

"Meanwhile devs start to apply this willy-nilly, as these configs tend to just 
keep getting built up over time "

SLA could mitigate this problem. Every blacklisted node for a specific job 
slows it down in a long run. Anyway, dev would have to report / communicate 
with ops to resolve node issue. 

 

"Ideally, blacklisting and speculation should be able to prevent that problem"

We are going to try out speculation but we are not there yet. )

> Provide configuration of initially blacklisted YARN nodes
> -
>
> Key: SPARK-26688
> URL: https://issues.apache.org/jira/browse/SPARK-26688
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Attila Zsolt Piros
>Assignee: Attila Zsolt Piros
>Priority: Major
> Fix For: 3.0.0
>
>
> Introducing new config for initially blacklisted YARN nodes.
> This came up in the apache spark user mailing list: 
> [http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-is-it-possible-to-manually-blacklist-nodes-before-running-spark-job-td34395.html]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27564) 'No plan for EventTimeWatermark' error while using structured streaming with column pruning

2019-04-25 Thread Kineret (JIRA)
Kineret created SPARK-27564:
---

 Summary: 'No plan for EventTimeWatermark' error while using 
structured streaming with column pruning
 Key: SPARK-27564
 URL: https://issues.apache.org/jira/browse/SPARK-27564
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.3.1
Reporter: Kineret






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27564) 'No plan for EventTimeWatermark' error while using structured streaming with column pruning

2019-04-25 Thread Kineret (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret updated SPARK-27564:

Description: 
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following:

{{class MyDataSourceReader extends DataSourceReader with 
SupportsPushDownRequiredColumns \{ var schema: StructType = createSchema() 
override def readSchema(): StructType = schema override def 
pruneColumns(requiredSchema: StructType) = { this.schema = requiredSchema } }}

and then:

{{class MyDataSourceReaderStream extends MyDataSourceReader { ... }}

This is my test code:

{{def x(): Unit = \{ val df1 = 
sparkSession.readStream.format(myV2Source).load() val df2 = df1 
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") 
.groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 .writeStream 
.format("console") .trigger(Trigger.ProcessingTime("10 seconds")) 
.outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() } }}

I get the following exception:

{{Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
[cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) 
AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
 at scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }}

Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.

> 'No plan for EventTimeWatermark' error while using structured streaming with 
> column pruning
> ---
>
> Key: SPARK-27564
> URL: https://issues.apache.org/jira/browse/SPARK-27564
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Kineret
>Priority: Major
>
> I get 'No plan for EventTimeWatermark' error while doing a query with columns 
> pruning using structured streaming with a custom data source that implements 
> Spark datasource v2.
> My data source implementation that handles the schemas includes the following:
> {{class MyDataSourceReader extends DataSourceReader with 
> SupportsPushDownRequiredColumns \{ var schema: StructType = createSchema() 
> override def readSchema(): StructType = schema override def 
> pruneColumns(requiredSchema: StructType) = { this.schema = requiredSchema } }}
> and then:
> {{class MyDataSourceReaderStream extends MyDataSourceReader { ... }}
> This is my test code:
> {{def x(): Unit = \{ val df1 = 
> sparkSession.readStream.format(myV2Source).load() val df2 = df1 
> .withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
> 30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") 
> .groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 
> .writeStream .format("console") .trigger(Trigger.ProcessingTime("10 
> seconds")) .outputMode(OutputMode.Append()) .start() 
> streamingQuery.awaitTermination() } }}
> I get the following exception:
> {{Caused by: java.lang.AssertionError: assertion failed: No plan for 
> EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
> [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) 
> AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L], 
> com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
>  at scala.Predef$.assert(Predef.scala:170) at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>  at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
>  at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
>  at 
> scala.collection.TraversableOnce$$

[jira] [Updated] (SPARK-27564) 'No plan for EventTimeWatermark' error while using structured streaming with column pruning

2019-04-25 Thread Kineret (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret updated SPARK-27564:

Description: 
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following:

 

 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
 

and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:

{\{def x(): Unit = { val df1 = 
sparkSession.readStream.format(myV2Source).load() val df2 = df1 
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") 
.groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 .writeStream 
.format("console") .trigger(Trigger.ProcessingTime("10 seconds")) 
.outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() } }}

I get the following exception:

{{Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
[cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) 
AS epoch#201, id#367L|#320L as double) / 3.0), 0) * 30.0) as timestamp) AS 
epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, 
id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
 at scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }}

Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.

  was:
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following:

{{class MyDataSourceReader extends DataSourceReader with 
SupportsPushDownRequiredColumns \{ var schema: StructType = createSchema() 
override def readSchema(): StructType = schema override def 
pruneColumns(requiredSchema: StructType) = { this.schema = requiredSchema } }}

and then:

{{class MyDataSourceReaderStream extends MyDataSourceReader { ... }}

This is my test code:

{{def x(): Unit = \{ val df1 = 
sparkSession.readStream.format(myV2Source).load() val df2 = df1 
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") 
.groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 .writeStream 
.format("console") .trigger(Trigger.ProcessingTime("10 seconds")) 
.outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() } }}

I get the following exception:

{{Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
[cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) 
AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
 at scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }}

Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use 

[jira] [Updated] (SPARK-27564) 'No plan for EventTimeWatermark' error while using structured streaming with column pruning

2019-04-25 Thread Kineret (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret updated SPARK-27564:

Description: 
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following:

 

 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
 

and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:

 

I get the following exception:

{{Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
[cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) 
AS epoch#201, id#367L|#320L as double) / 3.0), 0) * 30.0) as timestamp) AS 
epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, 
id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
 at scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }}

Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.

  was:
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following:

 

 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
 

and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:

{\{def x(): Unit = { val df1 = 
sparkSession.readStream.format(myV2Source).load() val df2 = df1 
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") 
.groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 .writeStream 
.format("console") .trigger(Trigger.ProcessingTime("10 seconds")) 
.outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() } }}

I get the following exception:

{{Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
[cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) 
AS epoch#201, id#367L|#320L as double) / 3.0), 0) * 30.0) as timestamp) AS 
epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, 
id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
 at scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }}

Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.


> 'No plan for EventTimeWatermark' error while using structured streaming with 
> column pruning
> ---
>
> Key: SPARK-27564
> URL: https://issues.apache.org/jira/brow

[jira] [Updated] (SPARK-27564) 'No plan for EventTimeWatermark' error while using structured streaming with column pruning

2019-04-25 Thread Kineret (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret updated SPARK-27564:

Description: 
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following: 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:
{code:java}
def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()

val df2 = df1
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()

val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()
   }
{code}
I get the following exception:
{code:java}
Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as 
timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
{code}
Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.

  was:
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following:

 

 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
 

and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:

 

I get the following exception:

{{Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project 
[cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) 
AS epoch#201, id#367L|#320L as double) / 3.0), 0) * 30.0) as timestamp) AS 
epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L|#320L, 
id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
 at scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) 
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) }}

Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation*although I use streaming.


> 'No plan for EventTimeWatermark' error while using structured streaming with 
> column pruning
> ---
>
> 

[jira] [Updated] (SPARK-27564) 'No plan for EventTimeWatermark' error while using structured streaming with column pruning

2019-04-25 Thread Kineret (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret updated SPARK-27564:

Description: 
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following: 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:
{code:java}
def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()

val df2 = df1
.withColumn("epoch", 
(round(col("epoch")/(30*1000))*30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()

val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()
   }
{code}
I get the following exception:
{code:java}
Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as 
timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
{code}
Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation* although I use streaming.

Where is the problem? 

  was:
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following: 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:
{code:java}
def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()

val df2 = df1
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()

val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()
   }
{code}
I get the following exception:
{code:java}
Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as 
timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.c

[jira] [Updated] (SPARK-27564) 'No plan for EventTimeWatermark' error while using structured streaming with column pruning

2019-04-25 Thread Kineret (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kineret updated SPARK-27564:

Description: 
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following: 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:
{code:java}
def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()

val df2 = df1
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()

val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()
   }
{code}
I get the following exception:
{code:java}
Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as 
timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
{code}
Note that in the logical plan I got *DataSourceV2Relation* and not 
*StreamingDataSourceV2Relation* although I use streaming.

Where is the problem? 

  was:
I get 'No plan for EventTimeWatermark' error while doing a query with columns 
pruning using structured streaming with a custom data source that implements 
Spark datasource v2.

My data source implementation that handles the schemas includes the following: 
{code:java}
class MyDataSourceReader extends DataSourceReader with  
SupportsPushDownRequiredColumns { 
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}{code}
and then:
{code:java}
class MyDataSourceReaderStream extends MyDataSourceReader { ... }
{code}
This is my test code:
{code:java}
def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()

val df2 = df1
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 
30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()

val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()
   }
{code}
I get the following exception:
{code:java}
Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as 
timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L], 
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
s

[jira] [Commented] (SPARK-4105) FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

2019-04-25 Thread Aakash Mandlik (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825901#comment-16825901
 ] 

Aakash Mandlik commented on SPARK-4105:
---

I am facing similar issue while persisting to S3 for Spark-2.4. Earlier the 
same code was working for spark-2.2. 

 

19/04/25 14:42:36 WARN scheduler.TaskSetManager: Lost task 11.0 in stage 55.0 
(TID 1345, node4, executor 1): org.xerial.snappy.SnappyIOExceptio
n: [EMPTY_INPUT] Cannot decompress empty stream
 at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:94)
 at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:59)
 at 
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:164)
 at 
org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
 at 
org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
 at 
org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698)
 at 
org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696)
 at scala.Option.map(Option.scala:146)
 at 
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:820)
 at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:875)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

> FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based 
> shuffle
> -
>
> Key: SPARK-4105
> URL: https://issues.apache.org/jira/browse/SPARK-4105
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 1.2.0, 1.2.1, 1.3.0, 1.4.1, 1.5.1, 1.6.1, 2.0.0
>Reporter: Josh Rosen
>Assignee: Davies Liu
>Priority: Critical
> Fix For: 2.2.0
>
> Attachments: JavaObjectToSerialize.java, 
> SparkFailedToUncompressGenerator.scala
>
>
> We have seen non-deterministic {{FAILED_TO_UNCOMPRESS(5)}} errors during 
> shuffle read.  Here's a sample stacktrace from an executor:
> {code}
> 14/10/23 18:34:11 ERROR Executor: Exception in task 1747.3 in stage 11.0 (TID 
> 33053)
> java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>   at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>   at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391)
>   at org.xerial.snappy.Snappy.uncompress(Snappy.java:427)
>   at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
>   at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>   at org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:58)
>   at 
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
>   at 
> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.c

[jira] [Assigned] (SPARK-27440) Optimize uncorrelated predicate subquery

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27440:


Assignee: Apache Spark

> Optimize uncorrelated predicate subquery
> 
>
> Key: SPARK-27440
> URL: https://issues.apache.org/jira/browse/SPARK-27440
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mingcong Han
>Assignee: Apache Spark
>Priority: Major
>
> Currently, we rewrite all the predicate subqueries(InSubquery, Exists) as 
> semi-join/anti-join. But uncorrelated predicate subquery can be evaluated 
> using a subplan instead of a join. We can firstly rewrite all the 
> uncorrelated predicate subqueries as `Exists`, then optimize it and compute 
> it using a subquery physical plan like ScalarSubquery. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27440) Optimize uncorrelated predicate subquery

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27440:


Assignee: (was: Apache Spark)

> Optimize uncorrelated predicate subquery
> 
>
> Key: SPARK-27440
> URL: https://issues.apache.org/jira/browse/SPARK-27440
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mingcong Han
>Priority: Major
>
> Currently, we rewrite all the predicate subqueries(InSubquery, Exists) as 
> semi-join/anti-join. But uncorrelated predicate subquery can be evaluated 
> using a subplan instead of a join. We can firstly rewrite all the 
> uncorrelated predicate subqueries as `Exists`, then optimize it and compute 
> it using a subquery physical plan like ScalarSubquery. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27440) Optimize uncorrelated predicate subquery

2019-04-25 Thread Mingcong Han (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingcong Han updated SPARK-27440:
-
Description: 
Currently, we rewrite all the predicate subqueries(InSubquery, Exists) as 
semi-join/anti-join. But uncorrelated predicate subquery can be evaluated using 
a subplan instead of a join. We can firstly optimize the subquery and then 
compute it using a subquery physical plan.



  was:
Currently, we rewrite all the predicate subqueries(InSubquery, Exists) as 
semi-join/anti-join. But uncorrelated predicate subquery can be evaluated using 
a subplan instead of a join. We can firstly rewrite all the uncorrelated 
predicate subqueries as `Exists`, then optimize it and compute it using a 
subquery physical plan like ScalarSubquery. 




> Optimize uncorrelated predicate subquery
> 
>
> Key: SPARK-27440
> URL: https://issues.apache.org/jira/browse/SPARK-27440
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mingcong Han
>Priority: Major
>
> Currently, we rewrite all the predicate subqueries(InSubquery, Exists) as 
> semi-join/anti-join. But uncorrelated predicate subquery can be evaluated 
> using a subplan instead of a join. We can firstly optimize the subquery and 
> then compute it using a subquery physical plan.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27340:


Assignee: Apache Spark

> Alias on TimeWIndow expression may cause watermark metadata lost 
> -
>
> Key: SPARK-27340
> URL: https://issues.apache.org/jira/browse/SPARK-27340
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Kevin Zhang
>Assignee: Apache Spark
>Priority: Major
>
> When we use data api to write a structured streaming query job we usually 
> specify a watermark on event time column. If we define a window on the event 
> time column, the delayKey metadata of the event time column is supposed to be 
> propagated to the new column generated by time window expression. But if we 
> add additional alias on the time window column, the delayKey metadata is lost.
> Currently I only find the bug will affect stream-stream join with equal 
> window join keys. In terms of aggregation, the gourping expression can be 
> trimed(in CleanupAliases rule) so additional alias are removed and the 
> metadata is kept.
> Here is an example:
> {code:scala}
>   val sparkSession = SparkSession
> .builder()
> .master("local")
> .getOrCreate()
>   val rateStream = sparkSession.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .load()
> val fooStream = rateStream
>   .select(
> col("value").as("fooId"),
> col("timestamp").as("fooTime")
>   )
>   .withWatermark("fooTime", "2 seconds")
>   .select($"fooId", $"fooTime", window($"fooTime", "2 
> seconds").alias("fooWindow"))
> val barStream = rateStream
>   .where(col("value") % 2 === 0)
>   .select(
> col("value").as("barId"),
> col("timestamp").as("barTime")
>   )
>   .withWatermark("barTime", "2 seconds")
>   .select($"barId", $"barTime", window($"barTime", "2 
> seconds").alias("barWindow"))
> val joinedDf = fooStream
>   .join(
> barStream,
> $"fooId" === $"barId" &&
>   fooStream.col("fooWindow") === barStream.col("barWindow"),
> joinType = "LeftOuter"
>   )
>   val query = joinedDf
>   .writeStream
>   .format("console")
>   .option("truncate", 100)
>   .trigger(Trigger.ProcessingTime("5 seconds"))
>   .start()
> query.awaitTermination()
> {code}
> this program will end with an exception, and from the analyzed plan we can 
> see there is no delayKey metadata on 'fooWindow'
> {code:java}
> org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
> streaming DataFrame/Datasets is not supported without a watermark in the join 
> keys, or a watermark on the nullable side and an appropriate range condition;;
> Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
> :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
> :  +- Filter isnotnull(fooTime#5-T2000ms)
> : +- Project [named_struct(start, precisetimestampconversion(CASE 
> WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(200 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, 
> precisetimestampconversion((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(200 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 200) + 0) + 200), LongType, 
> TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
> :+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
> :   +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
> :  +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.sources.Rat

[jira] [Assigned] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27340:


Assignee: (was: Apache Spark)

> Alias on TimeWIndow expression may cause watermark metadata lost 
> -
>
> Key: SPARK-27340
> URL: https://issues.apache.org/jira/browse/SPARK-27340
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Kevin Zhang
>Priority: Major
>
> When we use data api to write a structured streaming query job we usually 
> specify a watermark on event time column. If we define a window on the event 
> time column, the delayKey metadata of the event time column is supposed to be 
> propagated to the new column generated by time window expression. But if we 
> add additional alias on the time window column, the delayKey metadata is lost.
> Currently I only find the bug will affect stream-stream join with equal 
> window join keys. In terms of aggregation, the gourping expression can be 
> trimed(in CleanupAliases rule) so additional alias are removed and the 
> metadata is kept.
> Here is an example:
> {code:scala}
>   val sparkSession = SparkSession
> .builder()
> .master("local")
> .getOrCreate()
>   val rateStream = sparkSession.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .load()
> val fooStream = rateStream
>   .select(
> col("value").as("fooId"),
> col("timestamp").as("fooTime")
>   )
>   .withWatermark("fooTime", "2 seconds")
>   .select($"fooId", $"fooTime", window($"fooTime", "2 
> seconds").alias("fooWindow"))
> val barStream = rateStream
>   .where(col("value") % 2 === 0)
>   .select(
> col("value").as("barId"),
> col("timestamp").as("barTime")
>   )
>   .withWatermark("barTime", "2 seconds")
>   .select($"barId", $"barTime", window($"barTime", "2 
> seconds").alias("barWindow"))
> val joinedDf = fooStream
>   .join(
> barStream,
> $"fooId" === $"barId" &&
>   fooStream.col("fooWindow") === barStream.col("barWindow"),
> joinType = "LeftOuter"
>   )
>   val query = joinedDf
>   .writeStream
>   .format("console")
>   .option("truncate", 100)
>   .trigger(Trigger.ProcessingTime("5 seconds"))
>   .start()
> query.awaitTermination()
> {code}
> this program will end with an exception, and from the analyzed plan we can 
> see there is no delayKey metadata on 'fooWindow'
> {code:java}
> org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
> streaming DataFrame/Datasets is not supported without a watermark in the join 
> keys, or a watermark on the nullable side and an appropriate range condition;;
> Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
> :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
> :  +- Filter isnotnull(fooTime#5-T2000ms)
> : +- Project [named_struct(start, precisetimestampconversion(CASE 
> WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(200 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, 
> precisetimestampconversion((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(200 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(200 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 200) + 0) + 200), LongType, 
> TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
> :+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
> :   +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
> :  +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7,

[jira] [Assigned] (SPARK-27562) Complete the verification mechanism for shuffle transmitted data

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27562:


Assignee: Apache Spark

> Complete the verification mechanism for shuffle transmitted data
> 
>
> Key: SPARK-27562
> URL: https://issues.apache.org/jira/browse/SPARK-27562
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.4.0
>Reporter: feiwang
>Assignee: Apache Spark
>Priority: Major
>
> We've seen some shuffle data corruption during shuffle read phase. 
> As described in SPARK-26089, spark only checks small  shuffle blocks before 
> PR #23453, which is proposed by ankuriitg.
> There are two changes/improvements that are made in PR #23453.
> 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as 
> smaller blocks, so if a
> large block is corrupt in the starting, that block will be re-fetched and if 
> that also fails,
> FetchFailureException will be thrown.
> 2. If large blocks are corrupt after size maxBytesInFlight/3, then any 
> IOException thrown while
> reading the stream will be converted to FetchFailureException. This is 
> slightly more aggressive
> than was originally intended but since the consumer of the stream may have 
> already read some records and processed them, we can't just re-fetch the 
> block, we need to fail the whole task. Additionally, we also thought about 
> maybe adding a new type of TaskEndReason, which would re-try the task couple 
> of times before failing the previous stage, but given the complexity involved 
> in that solution we decided to not proceed in that direction.
> However, I think there still exists some problems with the current shuffle 
> transmitted data verification mechanism:
> - For a large block, it is checked upto  maxBytesInFlight/3 size when 
> fetching shuffle data. So if a large block is corrupt after size 
> maxBytesInFlight/3, it can not be detected in data fetch phase.  This has 
> been described in the previous section.
> - Only the compressed or wrapped blocks are checked, I think we should also 
> check thease blocks which are not wrapped.
> We complete the verification mechanism for shuffle transmitted data:
> Firstly, we choose crc32 for the checksum verification  of shuffle data.
> Crc is also used for checksum verification in hadoop, it is simple and fast.
> In shuffle write phase, after completing the partitionedFile, we compute 
> the crc32 value for each partition and then write these digests with the 
> indexs into shuffle index file.
> For the sortShuffleWriter and unsafe shuffle writer, there is only one 
> partitionedFile for a shuffleMapTask, so the compution of digests(compute the 
> digests for each partition depend on the indexs of this partitionedFile) is  
> cheap.
> For the bypassShuffleWriter, the reduce partitions is little than 
> byPassMergeThreshold, the cost of digests compution is acceptable.
> In shuffle read phase, the digest value will be passed with the block data.
> And we will recompute the digest of the data obtained to compare with the 
> origin digest value.
> When recomputing the digest of data obtained, it only need an additional 
> buffer(2048Bytes) for computing crc32 value.
> After recomputing, we will reset the obtained data inputStream, if it is 
> markSupported we only need reset it, otherwise it is a 
> fileSegmentManagerBuffer, we need recreate it.
> So, this verification mechanism  proposed for shuffle transmitted data is 
> cheap and complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27562) Complete the verification mechanism for shuffle transmitted data

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27562:


Assignee: (was: Apache Spark)

> Complete the verification mechanism for shuffle transmitted data
> 
>
> Key: SPARK-27562
> URL: https://issues.apache.org/jira/browse/SPARK-27562
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.4.0
>Reporter: feiwang
>Priority: Major
>
> We've seen some shuffle data corruption during shuffle read phase. 
> As described in SPARK-26089, spark only checks small  shuffle blocks before 
> PR #23453, which is proposed by ankuriitg.
> There are two changes/improvements that are made in PR #23453.
> 1. Large blocks are checked upto maxBytesInFlight/3 size in a similar way as 
> smaller blocks, so if a
> large block is corrupt in the starting, that block will be re-fetched and if 
> that also fails,
> FetchFailureException will be thrown.
> 2. If large blocks are corrupt after size maxBytesInFlight/3, then any 
> IOException thrown while
> reading the stream will be converted to FetchFailureException. This is 
> slightly more aggressive
> than was originally intended but since the consumer of the stream may have 
> already read some records and processed them, we can't just re-fetch the 
> block, we need to fail the whole task. Additionally, we also thought about 
> maybe adding a new type of TaskEndReason, which would re-try the task couple 
> of times before failing the previous stage, but given the complexity involved 
> in that solution we decided to not proceed in that direction.
> However, I think there still exists some problems with the current shuffle 
> transmitted data verification mechanism:
> - For a large block, it is checked upto  maxBytesInFlight/3 size when 
> fetching shuffle data. So if a large block is corrupt after size 
> maxBytesInFlight/3, it can not be detected in data fetch phase.  This has 
> been described in the previous section.
> - Only the compressed or wrapped blocks are checked, I think we should also 
> check thease blocks which are not wrapped.
> We complete the verification mechanism for shuffle transmitted data:
> Firstly, we choose crc32 for the checksum verification  of shuffle data.
> Crc is also used for checksum verification in hadoop, it is simple and fast.
> In shuffle write phase, after completing the partitionedFile, we compute 
> the crc32 value for each partition and then write these digests with the 
> indexs into shuffle index file.
> For the sortShuffleWriter and unsafe shuffle writer, there is only one 
> partitionedFile for a shuffleMapTask, so the compution of digests(compute the 
> digests for each partition depend on the indexs of this partitionedFile) is  
> cheap.
> For the bypassShuffleWriter, the reduce partitions is little than 
> byPassMergeThreshold, the cost of digests compution is acceptable.
> In shuffle read phase, the digest value will be passed with the block data.
> And we will recompute the digest of the data obtained to compare with the 
> origin digest value.
> When recomputing the digest of data obtained, it only need an additional 
> buffer(2048Bytes) for computing crc32 value.
> After recomputing, we will reset the obtained data inputStream, if it is 
> markSupported we only need reset it, otherwise it is a 
> fileSegmentManagerBuffer, we need recreate it.
> So, this verification mechanism  proposed for shuffle transmitted data is 
> cheap and complete.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27549) Commit Kafka Source offsets to facilitate external tooling

2019-04-25 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825994#comment-16825994
 ] 

Gabor Somogyi commented on SPARK-27549:
---

Do you mean commit offsets all the time or would like to differentiate like 
Flink (cp enabled/disabled)?
Presume you would like to consider committed offset like Flink do:
{quote}The committed offsets are only a means to expose the consumer’s progress 
for monitoring purposes.{quote}


> Commit Kafka Source offsets to facilitate external tooling
> --
>
> Key: SPARK-27549
> URL: https://issues.apache.org/jira/browse/SPARK-27549
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> Tools monitoring consumer lag could benefit from having the option of saving 
> the source offsets. Sources use the implementation of 
> org.apache.spark.sql.sources.v2.reader.streaming.
> SparkDataStream. KafkaMicroBatchStream currently [does not 
> commit|https://github.com/apache/spark/blob/5bf5d9d854db53541956dedb03e2de8eecf65b81/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala#L170]
>  anything as expected so we could expand that.
> Other streaming engines like 
> [Flink|https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration]
>  allow you to enable `auto.commit` at the expense of not having checkpointing.
> Here the proposal is to allow commit the sources offsets when progress has 
> been made.
> I am also aware that another option would be to have a StreamingQueryListener 
> and intercept when batches are completed and then write the offsets anywhere 
> you need to but it would be great if Kafka integration with Structured 
> Streaming could do some of this work anyway.
> [~c...@koeninger.org]  [~marmbrus] what do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27350) Support create table on data source V2

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27350:


Assignee: Apache Spark

> Support create table on data source V2
> --
>
> Key: SPARK-27350
> URL: https://issues.apache.org/jira/browse/SPARK-27350
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xianyang Liu
>Assignee: Apache Spark
>Priority: Major
>
> This patch adds support:
> create table on data source v2
> insert into data source v2 relation
> insert into data source v2 directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27350) Support create table on data source V2

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27350:


Assignee: (was: Apache Spark)

> Support create table on data source V2
> --
>
> Key: SPARK-27350
> URL: https://issues.apache.org/jira/browse/SPARK-27350
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xianyang Liu
>Priority: Major
>
> This patch adds support:
> create table on data source v2
> insert into data source v2 relation
> insert into data source v2 directory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread peng bo (JIRA)
peng bo created SPARK-27565:
---

 Summary: Show job info of WholeStageCodegen node on SparkSQL UI 
page
 Key: SPARK-27565
 URL: https://issues.apache.org/jira/browse/SPARK-27565
 Project: Spark
  Issue Type: Improvement
  Components: SQL, Web UI
Affects Versions: 3.0.0
Reporter: peng bo


Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} page. 
When one job failed, which {{PhysicalPlan(s)}} are involved are difficult to be 
located.

The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread peng bo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

peng bo updated SPARK-27565:

Attachment: SPARK-27565.jpg

> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Priority: Major
> Attachments: SPARK-27565.jpg
>
>
> Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} 
> page. When one job failed, which {{PhysicalPlan(s)}} are involved are 
> difficult to be located.
> The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread peng bo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

peng bo updated SPARK-27565:

Description: 
Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} page. 
When one job failed, which {{PhysicalPlan}} nodes are involved are difficult to 
be located.

The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.

  was:
Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} page. 
When one job failed, which {{PhysicalPlan(s)}} are involved are difficult to be 
located.

The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.


> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Priority: Major
> Attachments: SPARK-27565.jpg
>
>
> Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} 
> page. When one job failed, which {{PhysicalPlan}} nodes are involved are 
> difficult to be located.
> The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread peng bo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

peng bo updated SPARK-27565:

Attachment: (was: SPARK-27565.jpg)

> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Priority: Major
>
> Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} 
> page. When one job failed, which {{PhysicalPlan}} nodes are involved are 
> difficult to be located.
> The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread peng bo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

peng bo updated SPARK-27565:

Attachment: SPARK-27565.jpg

> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Priority: Major
> Attachments: SPARK-27565.jpg
>
>
> Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} 
> page. When one job failed, which {{PhysicalPlan}} nodes are involved are 
> difficult to be located.
> The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27566) SIGSEV in Spark SQL during broadcast

2019-04-25 Thread Martin Studer (JIRA)
Martin Studer created SPARK-27566:
-

 Summary: SIGSEV in Spark SQL during broadcast
 Key: SPARK-27566
 URL: https://issues.apache.org/jira/browse/SPARK-27566
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Martin Studer


During execution of a broadcast exchange the JVM aborts with a segmentation 
fault:

{noformat}
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
1.8.0_102-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode linux-amd64 
compressed oops)
# Problematic frame:
# j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
{noformat}

The corresponding information from the {{hs_err_pid}} is:

{noformat}
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
1.8.0_102-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode linux-amd64 
compressed oops)
# Problematic frame:
# j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
#
# Failed to write core dump. Core dumps have been disabled. To enable core 
dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

---  T H R E A D  ---

Current thread (0x7feaf8c54000):  JavaThread "broadcast-exchange-2" daemon 
[_thread_in_Java, id=30475, stack(0x7feabf0ca000,0x7feabf1cb000)]

siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 
0x003c7fc8

Registers:
RAX=0x0007c0011570, RBX=0x003c7f90, RCX=0x0038, 
RDX=0x000775daf6d0
RSP=0x7feabf1c8ab0, RBP=0x7feabf1c8af0, RSI=0x7feb0830, 
RDI=0x0001
R8 =0x7feb0800b280, R9 =0x7feb0800c6a0, R10=0x7feb73d59100, 
R11=0x7feb73181700
R12=0x, R13=0x7feb5c5a5951, R14=0x7feabf1c8b00, 
R15=0x7feaf8c54000
RIP=0x7feb5d024ea2, EFLAGS=0x00010283, CSGSFS=0x0033, 
ERR=0x0004
  TRAPNO=0x000e

Top of Stack: (sp=0x7feabf1c8ab0)
0x7feabf1c8ab0:   7feabf1c8ab0 7feb5c5a5951
0x7feabf1c8ac0:   7feabf1c8b00 7feb5c5a9610
0x7feabf1c8ad0:   7feb4e626068 7feb5c5a5970
0x7feabf1c8ae0:    7feabf1c8b00
0x7feabf1c8af0:   7feabf1c8b68 7feb5d007dd0
0x7feabf1c8b00:    7feb5d007dd0
0x7feabf1c8b10:   000775daf6d0 
0x7feabf1c8b20:   000774fd2048 7feabf1c8b18
0x7feabf1c8b30:   7feb4f27548f 7feabf1c8c20
0x7feabf1c8b40:   7feb4f275cd0 
0x7feabf1c8b50:   7feb4f2755f0 7feabf1c8b10
0x7feabf1c8b60:   7feabf1c8bf0 7feabf1c8c78
0x7feabf1c8b70:   7feb5d007dd0 
0x7feabf1c8b80:    
0x7feabf1c8b90:    
0x7feabf1c8ba0:    
0x7feabf1c8bb0:    
0x7feabf1c8bc0:    
0x7feabf1c8bd0:    0001
0x7feabf1c8be0:    000774fd2048
0x7feabf1c8bf0:   000775daf6f8 000775daf6e8
0x7feabf1c8c00:    7feb5d008040
0x7feabf1c8c10:   0020 7feb5d008040
0x7feabf1c8c20:   000774fdb080 0001
0x7feabf1c8c30:   000774fd2048 7feabf1c8c28
0x7feabf1c8c40:   7feb4f7636a3 7feabf1c8cc8
0x7feabf1c8c50:   7feabfb37848 
0x7feabf1c8c60:   7feb4f763720 7feabf1c8bf0
0x7feabf1c8c70:   7feabf1c8ca0 7feabf1c8d20
0x7feabf1c8c80:   7feb5d007dd0 
0x7feabf1c8c90:    0006c03f26e8
0x7feabf1c8ca0:   0006c03f26e8  

Instructions: (pc=0x7feb5d024ea2)
0x7feb5d024e82:   89 59 10 bf 01 00 00 00 48 89 79 18 48 83 c1 30
0x7feb5d024e92:   48 89 4d e0 48 3b d8 0f 84 5b 00 00 00 8b 48 0c
0x7feb5d024ea2:   48 3b 04 0b 0f 84 4e 00 00 00 83 f9 18 0f 85 22
0x7feb5d024eb2:   00 00 00 50 48 8b c0 57 48 8b 7b 20 8b 0f 48 83 

Register to memory mapping:

RAX=0x0007c0011570 is pointing into metadata
RBX=0x003c7f90 is an unknown value
RCX=0x0038 is an unknown value
RDX=0x000775daf6d0 is an oop

[error occurred during error reporting (printing register info), id 0xb]

Stack: [0x7feabf0ca000,0x7feabf1cb000],  sp=0x7feabf1c8ab0,  free 
space=1018k
Native frames

[jira] [Updated] (SPARK-27566) SIGSEV in Spark SQL during broadcast

2019-04-25 Thread Martin Studer (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Studer updated SPARK-27566:
--
Environment: Hortonworks HDP 2.6.5, Spark 2.3.0.2.6.5.1050-37

> SIGSEV in Spark SQL during broadcast
> 
>
> Key: SPARK-27566
> URL: https://issues.apache.org/jira/browse/SPARK-27566
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: Hortonworks HDP 2.6.5, Spark 2.3.0.2.6.5.1050-37
>Reporter: Martin Studer
>Priority: Major
>
> During execution of a broadcast exchange the JVM aborts with a segmentation 
> fault:
> {noformat}
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
> 1.8.0_102-b14)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
> {noformat}
> The corresponding information from the {{hs_err_pid}} is:
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
> 1.8.0_102-b14)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> #
> ---  T H R E A D  ---
> Current thread (0x7feaf8c54000):  JavaThread "broadcast-exchange-2" 
> daemon [_thread_in_Java, id=30475, 
> stack(0x7feabf0ca000,0x7feabf1cb000)]
> siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 
> 0x003c7fc8
> Registers:
> RAX=0x0007c0011570, RBX=0x003c7f90, RCX=0x0038, 
> RDX=0x000775daf6d0
> RSP=0x7feabf1c8ab0, RBP=0x7feabf1c8af0, RSI=0x7feb0830, 
> RDI=0x0001
> R8 =0x7feb0800b280, R9 =0x7feb0800c6a0, R10=0x7feb73d59100, 
> R11=0x7feb73181700
> R12=0x, R13=0x7feb5c5a5951, R14=0x7feabf1c8b00, 
> R15=0x7feaf8c54000
> RIP=0x7feb5d024ea2, EFLAGS=0x00010283, CSGSFS=0x0033, 
> ERR=0x0004
>   TRAPNO=0x000e
> Top of Stack: (sp=0x7feabf1c8ab0)
> 0x7feabf1c8ab0:   7feabf1c8ab0 7feb5c5a5951
> 0x7feabf1c8ac0:   7feabf1c8b00 7feb5c5a9610
> 0x7feabf1c8ad0:   7feb4e626068 7feb5c5a5970
> 0x7feabf1c8ae0:    7feabf1c8b00
> 0x7feabf1c8af0:   7feabf1c8b68 7feb5d007dd0
> 0x7feabf1c8b00:    7feb5d007dd0
> 0x7feabf1c8b10:   000775daf6d0 
> 0x7feabf1c8b20:   000774fd2048 7feabf1c8b18
> 0x7feabf1c8b30:   7feb4f27548f 7feabf1c8c20
> 0x7feabf1c8b40:   7feb4f275cd0 
> 0x7feabf1c8b50:   7feb4f2755f0 7feabf1c8b10
> 0x7feabf1c8b60:   7feabf1c8bf0 7feabf1c8c78
> 0x7feabf1c8b70:   7feb5d007dd0 
> 0x7feabf1c8b80:    
> 0x7feabf1c8b90:    
> 0x7feabf1c8ba0:    
> 0x7feabf1c8bb0:    
> 0x7feabf1c8bc0:    
> 0x7feabf1c8bd0:    0001
> 0x7feabf1c8be0:    000774fd2048
> 0x7feabf1c8bf0:   000775daf6f8 000775daf6e8
> 0x7feabf1c8c00:    7feb5d008040
> 0x7feabf1c8c10:   0020 7feb5d008040
> 0x7feabf1c8c20:   000774fdb080 0001
> 0x7feabf1c8c30:   000774fd2048 7feabf1c8c28
> 0x7feabf1c8c40:   7feb4f7636a3 7feabf1c8cc8
> 0x7feabf1c8c50:   7feabfb37848 
> 0x7feabf1c8c60:   7feb4f763720 7feabf1c8bf0
> 0x7feabf1c8c70:   7feabf1c8ca0 7feabf1c8d20
> 0x7feabf1c8c80:   7feb5d007dd0 
> 0x7feabf1c8c90:    0006c03f26e8
> 0x7feabf1c8ca0:   0006c03f26e8  
> Instructions: (pc=0x7feb5d024ea2)
> 0x7feb5d024e82:   89 59 10 bf 01 00 00 00 48 89 79 18 48 83 c1 30
> 0x7feb5d024e92:   48 89 4d e0 48 3b d8 0f 84 5b 00 00 00 8b 48 0c
> 0x7feb5d024ea2:   48 3

[jira] [Commented] (SPARK-27529) Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException

2019-04-25 Thread Dmitry Goldenberg (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826072#comment-16826072
 ] 

Dmitry Goldenberg commented on SPARK-27529:
---

Hi Hyukjin, I could check although it seems this behavior has been there a 
while and in all likelihood has not changed. A check with a higher version will 
entail some effort because a lot has changed in Kafka and in Spark.

I was hoping to get a more precise answer from someone who wrote the code in 
Spark Streaming or is familiar on the detailed level. Is this a bug or is this 
a behavior? If it's an expected behavior, what's causing it and how could we 
work around it?

> Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
> -
>
> Key: SPARK-27529
> URL: https://issues.apache.org/jira/browse/SPARK-27529
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.5.0
>Reporter: Dmitry Goldenberg
>Priority: Major
>
> We have a Spark Streaming consumer which at a certain point started 
> consistently failing upon a restart with the below error.
> Some details:
>  * Spark version is 1.5.0.
>  * Kafka version is 0.8.2.1 (2.10-0.8.2.1).
>  * The topic is configured with: retention.ms=1471228928, 
> max.message.bytes=1.
>  * The consumer runs with auto.offset.reset=smallest.
>  * No checkpointing is currently enabled.
> I don't see anything in the Spark or Kafka doc to understand why this is 
> happening. From googling around,
> {noformat}
> https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
> Finally, I’ll repeat that any semantics beyond at-most-once require that you 
> have sufficient log retention in Kafka. If you’re seeing things like 
> OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka 
> storage, not because something’s wrong with Spark or Kafka.{noformat}
> Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible 
> causes.
> {noformat}
> You've under-provisioned Kafka storage and / or Spark compute capacity.
> The result is that data is being deleted before it has been 
> processed.{noformat}
> All we're trying to do is start the consumer and consume from the topic from 
> the earliest available offset. Why would we not be able to do that? How can 
> the offsets be out of range if we're saying, just read from the earliest 
> available?
> Since we have the retention.ms set to 1 year and we created the topic just a 
> few weeks ago, I'd not expect any deletion being done by Kafka as we're 
> consuming.
> I'd like to understand the actual cause of this error. Any recommendations on 
> a workaround would be appreciated.
> Stack traces:
> {noformat}
> 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler
> .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job
> 2019-04-19 11:35:17,160 ERROR 
> org.apache.spark.streaming.scheduler.JobScheduler: Error running job 
> streaming job 1555682554000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in 
> stage 147.0 failed 4 times, most recent failure: Lost task
> 10.3 in stage 147.0 (TID 2368, 10.150.0.58): 
> kafka.common.OffsetOutOfRangeException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at 
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> at 
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
> at 
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> at 
> org.apache.spark.rdd.RDD$$anonfun$fore

[jira] [Assigned] (SPARK-27547) fix DataFrame self-join problems

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27547:


Assignee: Apache Spark  (was: Wenchen Fan)

> fix DataFrame self-join problems
> 
>
> Key: SPARK-27547
> URL: https://issues.apache.org/jira/browse/SPARK-27547
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27547) fix DataFrame self-join problems

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27547:


Assignee: Wenchen Fan  (was: Apache Spark)

> fix DataFrame self-join problems
> 
>
> Key: SPARK-27547
> URL: https://issues.apache.org/jira/browse/SPARK-27547
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27536) Code improvements for 3.0: existentials edition

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27536:


Assignee: Sean Owen  (was: Apache Spark)

> Code improvements for 3.0: existentials edition
> ---
>
> Key: SPARK-27536
> URL: https://issues.apache.org/jira/browse/SPARK-27536
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>
> The Spark code base makes use of 'existential types' in Scala, a language 
> feature which is quasi-deprecated -- it generates a warning unless 
> scala.language.existentials is imported, and there is talk of removing it 
> from future Scala versions: 
> https://contributors.scala-lang.org/t/proposal-to-remove-existential-types-from-the-language/2785
> We can get rid of most usages of this feature with lots of minor changes to 
> the code. A PR is coming to demonstrate what's involved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27536) Code improvements for 3.0: existentials edition

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27536:


Assignee: Apache Spark  (was: Sean Owen)

> Code improvements for 3.0: existentials edition
> ---
>
> Key: SPARK-27536
> URL: https://issues.apache.org/jira/browse/SPARK-27536
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, Spark Core, SQL, Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Sean Owen
>Assignee: Apache Spark
>Priority: Major
>
> The Spark code base makes use of 'existential types' in Scala, a language 
> feature which is quasi-deprecated -- it generates a warning unless 
> scala.language.existentials is imported, and there is talk of removing it 
> from future Scala versions: 
> https://contributors.scala-lang.org/t/proposal-to-remove-existential-types-from-the-language/2785
> We can get rid of most usages of this feature with lots of minor changes to 
> the code. A PR is coming to demonstrate what's involved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27300) Create the new graph projects in Spark and set up build/test

2019-04-25 Thread Martin Junghanns (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826124#comment-16826124
 ] 

Martin Junghanns commented on SPARK-27300:
--

As discussed offline with [~mengxr] we will create the following modules:
{code:java}
spark-graph (umbrella module that users depend on)
|
spark-cypher (contains the query engine implementation)
|
spark-graph-api (contains property graph APIs)
{code}

> Create the new graph projects in Spark and set up build/test
> 
>
> Key: SPARK-27300
> URL: https://issues.apache.org/jira/browse/SPARK-27300
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Martin Junghanns
>Priority: Major
>
> * Create graph projects in Spark repo that works with both maven and sbt.
> * Add internal and external dependencies.
> * Add a dummy test that automatically runs with PR build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread peng bo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

peng bo updated SPARK-27565:

Description: 
Currently it's really hard to link SQL plan to Spark jobs on SparkSQL UI page. 
When one job failed, which PhysicalPlan nodes are involved are difficult to be 
located. This PR is to show the job info of WholeStageCodegen node on SparkSQL 
UI page to make life easier.
Each WholeStageCodegen node has a task metric PIPELINE_DURATION_METRIC which 
records the total duration time shown in the top right. The idea is to put the 
job info into its metric value during metric aggregation.

  was:
Currently it's really hard to link SQL plan to Spark jobs on {{SparkSQL}} page. 
When one job failed, which {{PhysicalPlan}} nodes are involved are difficult to 
be located.

The PR is to show job info of WholeStageCodegen node on SparkSQL UI page.


> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Priority: Major
> Attachments: SPARK-27565.jpg
>
>
> Currently it's really hard to link SQL plan to Spark jobs on SparkSQL UI 
> page. When one job failed, which PhysicalPlan nodes are involved are 
> difficult to be located. This PR is to show the job info of WholeStageCodegen 
> node on SparkSQL UI page to make life easier.
> Each WholeStageCodegen node has a task metric PIPELINE_DURATION_METRIC which 
> records the total duration time shown in the top right. The idea is to put 
> the job info into its metric value during metric aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread peng bo (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

peng bo updated SPARK-27565:

Description: 
Currently it's really hard to link SQL plan to Spark jobs on SparkSQL UI page. 
When one job failed, which {{PhysicalPlan}} nodes are involved are difficult to 
be located. This PR is to show the job info of {{WholeStageCodegen}} node on 
SparkSQL UI page to make life easier.
Each {{WholeStageCodegen}} node has a task metric {{PIPELINE_DURATION_METRIC}} 
which records the total duration time shown in the top right. The idea is to 
put the job info into its metric value during metric aggregation.

  was:
Currently it's really hard to link SQL plan to Spark jobs on SparkSQL UI page. 
When one job failed, which PhysicalPlan nodes are involved are difficult to be 
located. This PR is to show the job info of WholeStageCodegen node on SparkSQL 
UI page to make life easier.
Each WholeStageCodegen node has a task metric PIPELINE_DURATION_METRIC which 
records the total duration time shown in the top right. The idea is to put the 
job info into its metric value during metric aggregation.


> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Priority: Major
> Attachments: SPARK-27565.jpg
>
>
> Currently it's really hard to link SQL plan to Spark jobs on SparkSQL UI 
> page. When one job failed, which {{PhysicalPlan}} nodes are involved are 
> difficult to be located. This PR is to show the job info of 
> {{WholeStageCodegen}} node on SparkSQL UI page to make life easier.
> Each {{WholeStageCodegen}} node has a task metric 
> {{PIPELINE_DURATION_METRIC}} which records the total duration time shown in 
> the top right. The idea is to put the job info into its metric value during 
> metric aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27565:


Assignee: (was: Apache Spark)

> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Priority: Major
> Attachments: SPARK-27565.jpg
>
>
> Currently it's really hard to link SQL plan to Spark jobs on SparkSQL UI 
> page. When one job failed, which {{PhysicalPlan}} nodes are involved are 
> difficult to be located. This PR is to show the job info of 
> {{WholeStageCodegen}} node on SparkSQL UI page to make life easier.
> Each {{WholeStageCodegen}} node has a task metric 
> {{PIPELINE_DURATION_METRIC}} which records the total duration time shown in 
> the top right. The idea is to put the job info into its metric value during 
> metric aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27565) Show job info of WholeStageCodegen node on SparkSQL UI page

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27565:


Assignee: Apache Spark

> Show job info of WholeStageCodegen node on SparkSQL UI page
> ---
>
> Key: SPARK-27565
> URL: https://issues.apache.org/jira/browse/SPARK-27565
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL, Web UI
>Affects Versions: 3.0.0
>Reporter: peng bo
>Assignee: Apache Spark
>Priority: Major
> Attachments: SPARK-27565.jpg
>
>
> Currently it's really hard to link SQL plan to Spark jobs on SparkSQL UI 
> page. When one job failed, which {{PhysicalPlan}} nodes are involved are 
> difficult to be located. This PR is to show the job info of 
> {{WholeStageCodegen}} node on SparkSQL UI page to make life easier.
> Each {{WholeStageCodegen}} node has a task metric 
> {{PIPELINE_DURATION_METRIC}} which records the total duration time shown in 
> the top right. The idea is to put the job info into its metric value during 
> metric aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'

2019-04-25 Thread Dmitry Goldenberg (JIRA)
Dmitry Goldenberg created SPARK-27567:
-

 Summary: Spark Streaming consumers (from Kafka) intermittently die 
with 'SparkException: Couldn't find leaders for Set'
 Key: SPARK-27567
 URL: https://issues.apache.org/jira/browse/SPARK-27567
 Project: Spark
  Issue Type: Bug
  Components: DStreams
Affects Versions: 1.5.0
 Environment: GCP / 170~14.04.1-Ubuntu
Reporter: Dmitry Goldenberg


Some of our consumers intermittently die with the stack traces I'm including. 
Once restarted they run for a while then die again.

I can't find any cohesive documentation on what this error means and how to go 
about troubleshooting it. Any help would be appreciated.

Some of the errors seen look like this:

ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on 
10.150.0.54: remote Rpc client disassociated

Main error stack trace:
{noformat}
2019-04-23 20:36:54,323 ERROR 
org.apache.spark.streaming.scheduler.JobScheduler: Error g

enerating jobs for time 1556066214000 ms

org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: 
Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], 
[hdfs.hbase.acme.attachmen

ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], 
[hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], 
[hdfs.hbase.acme.attachme

nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], 
[hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], 
[hdfs.hbase.acme.attachme

nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], 
[hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], 
[hdfs.hbase.acme.attach

ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], 
[hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], 
[hdfs.hbase.acme.attac

hments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], 
[hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachments,45], 
[hdfs.hbase.acme.att

achments,21], [hdfs.hbase.acme.attachments,3], 
[hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], 
[hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at

tachments,61]))

at 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
 ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j

ar:?]

at 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
 ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at scala.Option.orElse(Option.scala:257) 
~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar

[jira] [Updated] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'

2019-04-25 Thread Dmitry Goldenberg (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Goldenberg updated SPARK-27567:
--
Description: 
Some of our consumers intermittently die with the stack traces I'm including. 
Once restarted they run for a while then die again.

I can't find any cohesive documentation on what this error means and how to go 
about troubleshooting it. Any help would be appreciated.

*Kafka version* is 0.8.2.1 (2.10-0.8.2.1).

Some of the errors seen look like this:
{noformat}
ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on 
10.150.0.54: remote Rpc client disassociated{noformat}
Main error stack trace:
{noformat}
2019-04-23 20:36:54,323 ERROR 
org.apache.spark.streaming.scheduler.JobScheduler: Error g

enerating jobs for time 1556066214000 ms

org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: 
Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], 
[hdfs.hbase.acme.attachmen

ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], 
[hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], 
[hdfs.hbase.acme.attachme

nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], 
[hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], 
[hdfs.hbase.acme.attachme

nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], 
[hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], 
[hdfs.hbase.acme.attach

ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], 
[hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], 
[hdfs.hbase.acme.attac

hments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], 
[hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachments,45], 
[hdfs.hbase.acme.att

achments,21], [hdfs.hbase.acme.attachments,3], 
[hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], 
[hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at

tachments,61]))

at 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
 ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j

ar:?]

at 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
 ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at scala.Option.orElse(Option.scala:257) 
~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
 ~[spark-assembly-1.5.0-hadoop2.4.0.ja

r:1.5.0]

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStrea

[jira] [Commented] (SPARK-27549) Commit Kafka Source offsets to facilitate external tooling

2019-04-25 Thread Sean Glover (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826180#comment-16826180
 ] 

Sean Glover commented on SPARK-27549:
-

Yes, committing offsets would only be for the benefit of monitoring so the 
ecosystem of Kafka consumer group offset monitoring software can be used by the 
client with deployed Spark apps.  Flink manages offsets themselves too, but 
only commit to Kafka for this purpose.

Below is an excerpt of the [Flink docs applicable to this 
feature|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration]]
 which I think may be a reasonable design for Spark as well.
{quote}The Flink Kafka Consumer allows configuring the behaviour of how offsets 
are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the Flink 
Kafka Consumer does not rely on the committed offsets for fault tolerance 
guarantees. The committed offsets are only a means to expose the consumer’s 
progress for monitoring purposes.

The way to configure offset commit behaviour is different, depending on whether 
or not checkpointing is enabled for the job.
 * _Checkpointing disabled:_ if checkpointing is disabled, the Flink Kafka 
Consumer relies on the automatic periodic offset committing capability of the 
internally used Kafka clients. Therefore, to disable or enable offset 
committing, simply set the {{enable.auto.commit}} (or {{auto.commit.enable}} 
for Kafka 0.8) / {{auto.commit.interval.ms}} keys to appropriate values in the 
provided {{Properties}} configuration.

 * _Checkpointing enabled:_ if checkpointing is enabled, the Flink Kafka 
Consumer will commit the offsets stored in the checkpointed states when the 
checkpoints are completed. This ensures that the committed offsets in Kafka 
brokers is consistent with the offsets in the checkpointed states. Users can 
choose to disable or enable offset committing by calling 
the{{setCommitOffsetsOnCheckpoints(boolean)}} method on the consumer (by 
default, the behaviour is {{true}}). Note that in this scenario, the automatic 
periodic offset committing settings in {{Properties}} is completely ignored.
{quote}
 

> Commit Kafka Source offsets to facilitate external tooling
> --
>
> Key: SPARK-27549
> URL: https://issues.apache.org/jira/browse/SPARK-27549
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> Tools monitoring consumer lag could benefit from having the option of saving 
> the source offsets. Sources use the implementation of 
> org.apache.spark.sql.sources.v2.reader.streaming.
> SparkDataStream. KafkaMicroBatchStream currently [does not 
> commit|https://github.com/apache/spark/blob/5bf5d9d854db53541956dedb03e2de8eecf65b81/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala#L170]
>  anything as expected so we could expand that.
> Other streaming engines like 
> [Flink|https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration]
>  allow you to enable `auto.commit` at the expense of not having checkpointing.
> Here the proposal is to allow commit the sources offsets when progress has 
> been made.
> I am also aware that another option would be to have a StreamingQueryListener 
> and intercept when batches are completed and then write the offsets anywhere 
> you need to but it would be great if Kafka integration with Structured 
> Streaming could do some of this work anyway.
> [~c...@koeninger.org]  [~marmbrus] what do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27549) Commit Kafka Source offsets to facilitate external tooling

2019-04-25 Thread Sean Glover (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826180#comment-16826180
 ] 

Sean Glover edited comment on SPARK-27549 at 4/25/19 3:41 PM:
--

Yes, committing offsets would only be for the benefit of monitoring so the 
ecosystem of Kafka consumer group offset monitoring software can be used by the 
client with deployed Spark apps.  Flink manages offsets themselves too, but 
only commit to Kafka for this purpose.

Below is an excerpt of the [Flink docs applicable to this 
feature|https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration]
 which I think may be a reasonable design for Spark as well.
{quote}The Flink Kafka Consumer allows configuring the behaviour of how offsets 
are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the Flink 
Kafka Consumer does not rely on the committed offsets for fault tolerance 
guarantees. The committed offsets are only a means to expose the consumer’s 
progress for monitoring purposes.

The way to configure offset commit behaviour is different, depending on whether 
or not checkpointing is enabled for the job.
 * _Checkpointing disabled:_ if checkpointing is disabled, the Flink Kafka 
Consumer relies on the automatic periodic offset committing capability of the 
internally used Kafka clients. Therefore, to disable or enable offset 
committing, simply set the {{enable.auto.commit}} (or {{auto.commit.enable}} 
for Kafka 0.8) / {{auto.commit.interval.ms}} keys to appropriate values in the 
provided {{Properties}} configuration.

 * _Checkpointing enabled:_ if checkpointing is enabled, the Flink Kafka 
Consumer will commit the offsets stored in the checkpointed states when the 
checkpoints are completed. This ensures that the committed offsets in Kafka 
brokers is consistent with the offsets in the checkpointed states. Users can 
choose to disable or enable offset committing by calling 
the\{{setCommitOffsetsOnCheckpoints(boolean)}} method on the consumer (by 
default, the behaviour is {{true}}). Note that in this scenario, the automatic 
periodic offset committing settings in {{Properties}} is completely 
ignored.{quote}
 


was (Author: sean.glover):
Yes, committing offsets would only be for the benefit of monitoring so the 
ecosystem of Kafka consumer group offset monitoring software can be used by the 
client with deployed Spark apps.  Flink manages offsets themselves too, but 
only commit to Kafka for this purpose.

Below is an excerpt of the [Flink docs applicable to this 
feature|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration]]
 which I think may be a reasonable design for Spark as well.
{quote}The Flink Kafka Consumer allows configuring the behaviour of how offsets 
are committed back to Kafka brokers (or Zookeeper in 0.8). Note that the Flink 
Kafka Consumer does not rely on the committed offsets for fault tolerance 
guarantees. The committed offsets are only a means to expose the consumer’s 
progress for monitoring purposes.

The way to configure offset commit behaviour is different, depending on whether 
or not checkpointing is enabled for the job.
 * _Checkpointing disabled:_ if checkpointing is disabled, the Flink Kafka 
Consumer relies on the automatic periodic offset committing capability of the 
internally used Kafka clients. Therefore, to disable or enable offset 
committing, simply set the {{enable.auto.commit}} (or {{auto.commit.enable}} 
for Kafka 0.8) / {{auto.commit.interval.ms}} keys to appropriate values in the 
provided {{Properties}} configuration.

 * _Checkpointing enabled:_ if checkpointing is enabled, the Flink Kafka 
Consumer will commit the offsets stored in the checkpointed states when the 
checkpoints are completed. This ensures that the committed offsets in Kafka 
brokers is consistent with the offsets in the checkpointed states. Users can 
choose to disable or enable offset committing by calling 
the{{setCommitOffsetsOnCheckpoints(boolean)}} method on the consumer (by 
default, the behaviour is {{true}}). Note that in this scenario, the automatic 
periodic offset committing settings in {{Properties}} is completely ignored.
{quote}
 

> Commit Kafka Source offsets to facilitate external tooling
> --
>
> Key: SPARK-27549
> URL: https://issues.apache.org/jira/browse/SPARK-27549
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> Tools monitoring consumer lag could benefit from having the option of saving 
> the source offsets. Sources use the implementati

[jira] [Resolved] (SPARK-27551) Improve error message of mismatched types for CASE WHEN

2019-04-25 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-27551.
---
   Resolution: Fixed
 Assignee: Liang-Chi Hsieh
Fix Version/s: 3.0.0

This is resolved via https://github.com/apache/spark/pull/24453

>  Improve error message of mismatched types for CASE WHEN
> 
>
> Key: SPARK-27551
> URL: https://issues.apache.org/jira/browse/SPARK-27551
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Huon Wilson
>Assignee: Liang-Chi Hsieh
>Priority: Minor
> Fix For: 3.0.0
>
>
> When a {{when(...).otherwise(...)}} construct has a type error, the error 
> message can be quite uninformative, since it just splats out a potentially 
> large chunk of code and says the types don't match. For instance:
> {code:none}
> scala> spark.range(100).select(when('id === 1, array(struct('id * 123456789 + 
> 123456789 as "x"))).otherwise(array(struct('id * 987654321 + 987654321 as 
> "y"
> org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`id` = 
> CAST(1 AS BIGINT)) THEN array(named_struct('x', ((`id` * CAST(123456789 AS 
> BIGINT)) + CAST(123456789 AS BIGINT ELSE array(named_struct('y', ((`id` * 
> CAST(987654321 AS BIGINT)) + CAST(987654321 AS BIGINT END' due to data 
> type mismatch: THEN and ELSE expressions should all be same type or coercible 
> to a common type;;
> ...
> {code}
> The problem is the structs have different field names ({{x}} vs {{y}}), but 
> it's not obvious that this is the case, and this is a relatively simple case 
> of a single {{select}} expression.
> It would be great for the error message to at least include the types that 
> Spark has computed, to help clarify what might have gone wrong. For instance, 
> {{greatest}} and {{least}} write out the expression with the types instead of 
> values:
> {code:none}
> scala> spark.range(100).select(greatest('id, struct(lit("x"
> org.apache.spark.sql.AnalysisException: cannot resolve 'greatest(`id`, 
> named_struct('col1', 'x'))' due to data type mismatch: The expressions should 
> all have the same type, got GREATEST(bigint, struct).;;
> {code}
> For the example above, this might look like:
> {code:none}
> org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`id` = 
> CAST(1 AS BIGINT)) THEN array(named_struct('x', ((`id` * CAST(123456789 AS 
> BIGINT)) + CAST(123456789 AS BIGINT ELSE array(named_struct('y', ((`id` * 
> CAST(987654321 AS BIGINT)) + CAST(987654321 AS BIGINT END' due to data 
> type mismatch: THEN and ELSE expressions should all be same type or coercible 
> to a common type, got CASE WHEN ... THEN array> ELSE 
> array> END;;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27551) Improve error message of mismatched types for CASE WHEN

2019-04-25 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27551:
--
Summary:  Improve error message of mismatched types for CASE WHEN  (was: 
Uniformative error message for mismatched types in when().otherwise())

>  Improve error message of mismatched types for CASE WHEN
> 
>
> Key: SPARK-27551
> URL: https://issues.apache.org/jira/browse/SPARK-27551
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Huon Wilson
>Priority: Minor
>
> When a {{when(...).otherwise(...)}} construct has a type error, the error 
> message can be quite uninformative, since it just splats out a potentially 
> large chunk of code and says the types don't match. For instance:
> {code:none}
> scala> spark.range(100).select(when('id === 1, array(struct('id * 123456789 + 
> 123456789 as "x"))).otherwise(array(struct('id * 987654321 + 987654321 as 
> "y"
> org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`id` = 
> CAST(1 AS BIGINT)) THEN array(named_struct('x', ((`id` * CAST(123456789 AS 
> BIGINT)) + CAST(123456789 AS BIGINT ELSE array(named_struct('y', ((`id` * 
> CAST(987654321 AS BIGINT)) + CAST(987654321 AS BIGINT END' due to data 
> type mismatch: THEN and ELSE expressions should all be same type or coercible 
> to a common type;;
> ...
> {code}
> The problem is the structs have different field names ({{x}} vs {{y}}), but 
> it's not obvious that this is the case, and this is a relatively simple case 
> of a single {{select}} expression.
> It would be great for the error message to at least include the types that 
> Spark has computed, to help clarify what might have gone wrong. For instance, 
> {{greatest}} and {{least}} write out the expression with the types instead of 
> values:
> {code:none}
> scala> spark.range(100).select(greatest('id, struct(lit("x"
> org.apache.spark.sql.AnalysisException: cannot resolve 'greatest(`id`, 
> named_struct('col1', 'x'))' due to data type mismatch: The expressions should 
> all have the same type, got GREATEST(bigint, struct).;;
> {code}
> For the example above, this might look like:
> {code:none}
> org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`id` = 
> CAST(1 AS BIGINT)) THEN array(named_struct('x', ((`id` * CAST(123456789 AS 
> BIGINT)) + CAST(123456789 AS BIGINT ELSE array(named_struct('y', ((`id` * 
> CAST(987654321 AS BIGINT)) + CAST(987654321 AS BIGINT END' due to data 
> type mismatch: THEN and ELSE expressions should all be same type or coercible 
> to a common type, got CASE WHEN ... THEN array> ELSE 
> array> END;;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27568) readLock leaked when method take() called on a cached rdd

2019-04-25 Thread wuyi (JIRA)
wuyi created SPARK-27568:


 Summary: readLock leaked when method take() called on a cached rdd
 Key: SPARK-27568
 URL: https://issues.apache.org/jira/browse/SPARK-27568
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0, 2.3.0, 2.2.0, 2.1.0, 2.0.0
Reporter: wuyi


 
{code:java}
sc.parallelize(Range(0, 10), 1).cache().take(1){code}
 

if execute the above code in Spark, we'll see the line below in log:

*19/04/25 23:48:54 INFO Executor: 1 block locks were not released by TID = 0:*
*[rdd_0_0]* 

 

and, If we set "spark.storage.exceptionOnPinLeak"=true, job will fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27248) REFRESH TABLE should recreate cache with same cache name and storage level

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27248:


Assignee: (was: Apache Spark)

> REFRESH TABLE should recreate cache with same cache name and storage level
> --
>
> Key: SPARK-27248
> URL: https://issues.apache.org/jira/browse/SPARK-27248
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: William Wong
>Priority: Major
>
> If we refresh a cached table, the table cache will be first uncached and then 
> recache (lazily). Currently, the logic is embedded in 
> CatalogImpl.refreshTable method.
> The current implementation does not preserve the cache name and storage 
> level. As a result, cache name and cache level could be changed after a 
> REFERSH. IMHO, it is not what a user would expect.
> I would like to fix this behavior by first save the cache name and storage 
> level for recaching the table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27248) REFRESH TABLE should recreate cache with same cache name and storage level

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27248:


Assignee: Apache Spark

> REFRESH TABLE should recreate cache with same cache name and storage level
> --
>
> Key: SPARK-27248
> URL: https://issues.apache.org/jira/browse/SPARK-27248
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: William Wong
>Assignee: Apache Spark
>Priority: Major
>
> If we refresh a cached table, the table cache will be first uncached and then 
> recache (lazily). Currently, the logic is embedded in 
> CatalogImpl.refreshTable method.
> The current implementation does not preserve the cache name and storage 
> level. As a result, cache name and cache level could be changed after a 
> REFERSH. IMHO, it is not what a user would expect.
> I would like to fix this behavior by first save the cache name and storage 
> level for recaching the table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27272) Enable blacklisting of node/executor on fetch failures by default

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27272:


Assignee: (was: Apache Spark)

> Enable blacklisting of node/executor on fetch failures by default
> -
>
> Key: SPARK-27272
> URL: https://issues.apache.org/jira/browse/SPARK-27272
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ankur Gupta
>Priority: Major
>
> SPARK-20898 added a new configuration to blacklist a node/executor on fetch
> failures. This config was deemed risky at the time and was disabled by default
> until more data is collected.
> This commit aims to enable that feature by default as we have seen couple of
> instances where that feature was found to be useful. The failures occur 
> because of issues with node manager and external shuffle service on that node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27297) Add higher order functions to Scala API

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27297:


Assignee: Apache Spark

> Add higher order functions to Scala API
> ---
>
> Key: SPARK-27297
> URL: https://issues.apache.org/jira/browse/SPARK-27297
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Nikolas Vanderhoof
>Assignee: Apache Spark
>Priority: Major
>
> There is currently no existing Scala API equivalent for the higher order 
> functions introduced in Spark 2.4.0.
>  * transform
>  * aggregate
>  * filter
>  * exists
>  * zip_with
>  * map_zip_with
>  * map_filter
>  * transform_values
>  * transform_keys
> Equivalent column based functions should be added to the Scala API for 
> org.apache.spark.sql.functions with the following signatures:
>  
> {code:scala}
> def transform(column: Column, f: Column => Column): Column = ???
> def transform(column: Column, f: (Column, Column) => Column): Column = ???
> def exists(column: Column, f: Column => Column): Column = ???
> def filter(column: Column, f: Column => Column): Column = ???
> def aggregate(
> expr: Column,
> zero: Column,
> merge: (Column, Column) => Column,
> finish: Column => Column): Column = ???
> def aggregate(
> expr: Column,
> zero: Column,
> merge: (Column, Column) => Column): Column = ???
> def zip_with(
> left: Column,
> right: Column,
> f: (Column, Column) => Column): Column = ???
> def transform_keys(expr: Column, f: (Column, Column) => Column): Column = ???
> def transform_values(expr: Column, f: (Column, Column) => Column): Column = 
> ???
> def map_filter(expr: Column, f: (Column, Column) => Column): Column = ???
> def map_zip_with(left: Column, right: Column, f: (Column, Column, Column) => 
> Column): Column = ???
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27229) GroupBy Placement in Intersect Distinct

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27229:


Assignee: Apache Spark

> GroupBy Placement in Intersect Distinct
> ---
>
> Key: SPARK-27229
> URL: https://issues.apache.org/jira/browse/SPARK-27229
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Song Jun
>Assignee: Apache Spark
>Priority: Major
>
> Intersect  operator will be replace by Left Semi Join in Optimizer.
> for example:
> SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2
>  ==>  SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND 
> a2<=>b2
> if Tabe1 and Tab2 are too large, the join will be very slow, we can reduce 
> the table data before
> Join by place groupby operator under join, that is 
> ==>  
> SELECT a1, a2 FROM 
>(SELECT a1,a2 FROM Tab1 GROUP BY a1,a2) X
>LEFT SEMI JOIN 
>(SELECT b1,b2 FROM Tab2 GROUP BY b1,b2) Y
> ON X.a1<=>Y.b1 AND X.a2<=>Y.b2
> then we can have smaller table data when execute join, because  group by has 
> cut lots of 
>  data.
>  
> A pr will be submit soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27237) Introduce State schema validation among query restart

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27237:


Assignee: (was: Apache Spark)

> Introduce State schema validation among query restart
> -
>
> Key: SPARK-27237
> URL: https://issues.apache.org/jira/browse/SPARK-27237
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Priority: Major
>
> Even though Spark structured streaming guide page clearly documents that "Any 
> change in number or type of grouping keys or aggregates is not allowed.", 
> Spark doesn't do anything when end users try to do it, which would end up 
> with indeterministic outputs or unexpected exceptions.
> Even worse, if the query doesn't crash by chance it could write the new 
> messed values to state which completely breaks state unless end users roll 
> back to specific batch via manually editing checkpoint.
> The restriction is clear, the number of columns, and data type for each must 
> not be modified among query runs. We can store schema of state along with 
> state, and verify whether the (maybe) new schema is compatible if state 
> schema is modified. With this validation we can prevent query runs and shows 
> indeterministic behavior when schema is incompatible, as well as we can give 
> more informative error messages to end users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27297) Add higher order functions to Scala API

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27297:


Assignee: (was: Apache Spark)

> Add higher order functions to Scala API
> ---
>
> Key: SPARK-27297
> URL: https://issues.apache.org/jira/browse/SPARK-27297
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Nikolas Vanderhoof
>Priority: Major
>
> There is currently no existing Scala API equivalent for the higher order 
> functions introduced in Spark 2.4.0.
>  * transform
>  * aggregate
>  * filter
>  * exists
>  * zip_with
>  * map_zip_with
>  * map_filter
>  * transform_values
>  * transform_keys
> Equivalent column based functions should be added to the Scala API for 
> org.apache.spark.sql.functions with the following signatures:
>  
> {code:scala}
> def transform(column: Column, f: Column => Column): Column = ???
> def transform(column: Column, f: (Column, Column) => Column): Column = ???
> def exists(column: Column, f: Column => Column): Column = ???
> def filter(column: Column, f: Column => Column): Column = ???
> def aggregate(
> expr: Column,
> zero: Column,
> merge: (Column, Column) => Column,
> finish: Column => Column): Column = ???
> def aggregate(
> expr: Column,
> zero: Column,
> merge: (Column, Column) => Column): Column = ???
> def zip_with(
> left: Column,
> right: Column,
> f: (Column, Column) => Column): Column = ???
> def transform_keys(expr: Column, f: (Column, Column) => Column): Column = ???
> def transform_values(expr: Column, f: (Column, Column) => Column): Column = 
> ???
> def map_filter(expr: Column, f: (Column, Column) => Column): Column = ???
> def map_zip_with(left: Column, right: Column, f: (Column, Column, Column) => 
> Column): Column = ???
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26356) Remove SaveMode from data source v2 API

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26356:


Assignee: Apache Spark

> Remove SaveMode from data source v2 API
> ---
>
> Key: SPARK-26356
> URL: https://issues.apache.org/jira/browse/SPARK-26356
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Wenchen Fan
>Assignee: Apache Spark
>Priority: Blocker
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27204) First time Loading application page from History Server is taking time when event log size is huge

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27204:


Assignee: Apache Spark

> First time Loading application page from History Server is taking time when 
> event log size is huge
> --
>
> Key: SPARK-27204
> URL: https://issues.apache.org/jira/browse/SPARK-27204
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, Web UI
>Affects Versions: 2.3.3, 2.4.0
>Reporter: ABHISHEK KUMAR GUPTA
>Assignee: Apache Spark
>Priority: Minor
>
> 1. Launch spark shell and submit a long running job.
> 2. Measure the loading time of Job History Page first time.
> 3. For Example Event Log Size = 18GB, With disk store, Application page 
> Loading time takes first time 47 Min



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27232) Ignore file locality in InMemoryFileIndex if spark.locality.wait is set to

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27232:


Assignee: (was: Apache Spark)

> Ignore file locality in InMemoryFileIndex if spark.locality.wait is set to
> --
>
> Key: SPARK-27232
> URL: https://issues.apache.org/jira/browse/SPARK-27232
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: EdisonWang
>Priority: Minor
>
> `InMemoryFileIndex` needs to request file block location information in order 
> to do locality schedule in `TaskSetManager`. 
> Usually this is a time-cost task.  For example, In our production env, there 
> are 24 partitions, with totally 149925 files and 83TB in size. It costs about 
> 10 minutes to request file block locations before submit a spark job. Even 
> though I set `spark.sql.sources.parallelPartitionDiscovery.threshold` to 24 
> to make it parallelized, it also needs 2 minutes. 
> Anyway, this is a waste if we don't care about the locality of files(for 
> example, storage and computation are separate).
> So there should be a conf to control whether we need to send 
> `getFileBlockLocations` request to HDFS NN. If user set `spark.locality.wait` 
> to 0, file block location information is meaningless. 
> Here in this PR, if `spark.locality.wait` is set to 0, it will not request 
> file location information anymore, which will save several seconds to minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27204) First time Loading application page from History Server is taking time when event log size is huge

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27204:


Assignee: (was: Apache Spark)

> First time Loading application page from History Server is taking time when 
> event log size is huge
> --
>
> Key: SPARK-27204
> URL: https://issues.apache.org/jira/browse/SPARK-27204
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, Web UI
>Affects Versions: 2.3.3, 2.4.0
>Reporter: ABHISHEK KUMAR GUPTA
>Priority: Minor
>
> 1. Launch spark shell and submit a long running job.
> 2. Measure the loading time of Job History Page first time.
> 3. For Example Event Log Size = 18GB, With disk store, Application page 
> Loading time takes first time 47 Min



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27280) infer filters from Join's OR condition

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27280:


Assignee: (was: Apache Spark)

> infer filters from Join's OR condition
> --
>
> Key: SPARK-27280
> URL: https://issues.apache.org/jira/browse/SPARK-27280
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer, SQL
>Affects Versions: 3.0.0
>Reporter: Song Jun
>Priority: Major
>
> In some case, We can infer filters from Join condition with OR expressions.
> for example, tpc-ds query 48:
> {code:java}
> select sum (ss_quantity)
>  from store_sales, store, customer_demographics, customer_address, date_dim
>  where s_store_sk = ss_store_sk
>  and  ss_sold_date_sk = d_date_sk and d_year = 2000
>  and  
>  (
>   (
>cd_demo_sk = ss_cdemo_sk
>and 
>cd_marital_status = 'S'
>and 
>cd_education_status = 'Secondary'
>and 
>ss_sales_price between 100.00 and 150.00  
>)
>  or
>   (
>   cd_demo_sk = ss_cdemo_sk
>and 
>cd_marital_status = 'M'
>and 
>cd_education_status = 'College'
>and 
>ss_sales_price between 50.00 and 100.00   
>   )
>  or 
>  (
>   cd_demo_sk = ss_cdemo_sk
>   and 
>cd_marital_status = 'U'
>and 
>cd_education_status = '2 yr Degree'
>and 
>ss_sales_price between 150.00 and 200.00  
>  )
>  )
>  and
>  (
>   (
>   ss_addr_sk = ca_address_sk
>   and
>   ca_country = 'United States'
>   and
>   ca_state in ('AL', 'OH', 'MD')
>   and ss_net_profit between 0 and 2000  
>   )
>  or
>   (ss_addr_sk = ca_address_sk
>   and
>   ca_country = 'United States'
>   and
>   ca_state in ('VA', 'TX', 'IA')
>   and ss_net_profit between 150 and 3000 
>   )
>  or
>   (ss_addr_sk = ca_address_sk
>   and
>   ca_country = 'United States'
>   and
>   ca_state in ('RI', 'WI', 'KY')
>   and ss_net_profit between 50 and 25000 
>   )
>  )
> ;
> {code}
> we can infer two filters from the join or condidtion:
> {code:java}
> for customer_demographics:
> cd_marital_status in(‘D',‘U',‘M') and cd_education_status in('4 yr 
> Degree’,’Secondary’,’Primary')
> for store_sales:
>  (ss_sales_price between 100.00 and 150.00 or ss_sales_price between 50.00 
> and 100.00 or ss_sales_price between 150.00 and 200.00)
> {code}
> then then we can push down the above two filters to filter  
> customer_demographics/store_sales.
> A pr will be submit soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27254) Cleanup complete but becoming invalid output files in ManifestFileCommitProtocol if job is aborted

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27254:


Assignee: Apache Spark

> Cleanup complete but becoming invalid output files in 
> ManifestFileCommitProtocol if job is aborted
> --
>
> Key: SPARK-27254
> URL: https://issues.apache.org/jira/browse/SPARK-27254
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Assignee: Apache Spark
>Priority: Minor
>
> ManifestFileCommitProtocol doesn't clean up complete (but will become 
> invalid) output files when job is aborted.
> ManifestFileCommitProtocol doesn't do anything for cleaning up when job is 
> aborted but just maintains the metadata which list of complete output files 
> are written. SPARK-27210 addressed for task level cleanup, but it still 
> doesn't clean up it as job level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27254) Cleanup complete but becoming invalid output files in ManifestFileCommitProtocol if job is aborted

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27254:


Assignee: (was: Apache Spark)

> Cleanup complete but becoming invalid output files in 
> ManifestFileCommitProtocol if job is aborted
> --
>
> Key: SPARK-27254
> URL: https://issues.apache.org/jira/browse/SPARK-27254
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Priority: Minor
>
> ManifestFileCommitProtocol doesn't clean up complete (but will become 
> invalid) output files when job is aborted.
> ManifestFileCommitProtocol doesn't do anything for cleaning up when job is 
> aborted but just maintains the metadata which list of complete output files 
> are written. SPARK-27210 addressed for task level cleanup, but it still 
> doesn't clean up it as job level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27280) infer filters from Join's OR condition

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27280:


Assignee: Apache Spark

> infer filters from Join's OR condition
> --
>
> Key: SPARK-27280
> URL: https://issues.apache.org/jira/browse/SPARK-27280
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer, SQL
>Affects Versions: 3.0.0
>Reporter: Song Jun
>Assignee: Apache Spark
>Priority: Major
>
> In some case, We can infer filters from Join condition with OR expressions.
> for example, tpc-ds query 48:
> {code:java}
> select sum (ss_quantity)
>  from store_sales, store, customer_demographics, customer_address, date_dim
>  where s_store_sk = ss_store_sk
>  and  ss_sold_date_sk = d_date_sk and d_year = 2000
>  and  
>  (
>   (
>cd_demo_sk = ss_cdemo_sk
>and 
>cd_marital_status = 'S'
>and 
>cd_education_status = 'Secondary'
>and 
>ss_sales_price between 100.00 and 150.00  
>)
>  or
>   (
>   cd_demo_sk = ss_cdemo_sk
>and 
>cd_marital_status = 'M'
>and 
>cd_education_status = 'College'
>and 
>ss_sales_price between 50.00 and 100.00   
>   )
>  or 
>  (
>   cd_demo_sk = ss_cdemo_sk
>   and 
>cd_marital_status = 'U'
>and 
>cd_education_status = '2 yr Degree'
>and 
>ss_sales_price between 150.00 and 200.00  
>  )
>  )
>  and
>  (
>   (
>   ss_addr_sk = ca_address_sk
>   and
>   ca_country = 'United States'
>   and
>   ca_state in ('AL', 'OH', 'MD')
>   and ss_net_profit between 0 and 2000  
>   )
>  or
>   (ss_addr_sk = ca_address_sk
>   and
>   ca_country = 'United States'
>   and
>   ca_state in ('VA', 'TX', 'IA')
>   and ss_net_profit between 150 and 3000 
>   )
>  or
>   (ss_addr_sk = ca_address_sk
>   and
>   ca_country = 'United States'
>   and
>   ca_state in ('RI', 'WI', 'KY')
>   and ss_net_profit between 50 and 25000 
>   )
>  )
> ;
> {code}
> we can infer two filters from the join or condidtion:
> {code:java}
> for customer_demographics:
> cd_marital_status in(‘D',‘U',‘M') and cd_education_status in('4 yr 
> Degree’,’Secondary’,’Primary')
> for store_sales:
>  (ss_sales_price between 100.00 and 150.00 or ss_sales_price between 50.00 
> and 100.00 or ss_sales_price between 150.00 and 200.00)
> {code}
> then then we can push down the above two filters to filter  
> customer_demographics/store_sales.
> A pr will be submit soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27229) GroupBy Placement in Intersect Distinct

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27229:


Assignee: (was: Apache Spark)

> GroupBy Placement in Intersect Distinct
> ---
>
> Key: SPARK-27229
> URL: https://issues.apache.org/jira/browse/SPARK-27229
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Song Jun
>Priority: Major
>
> Intersect  operator will be replace by Left Semi Join in Optimizer.
> for example:
> SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2
>  ==>  SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND 
> a2<=>b2
> if Tabe1 and Tab2 are too large, the join will be very slow, we can reduce 
> the table data before
> Join by place groupby operator under join, that is 
> ==>  
> SELECT a1, a2 FROM 
>(SELECT a1,a2 FROM Tab1 GROUP BY a1,a2) X
>LEFT SEMI JOIN 
>(SELECT b1,b2 FROM Tab2 GROUP BY b1,b2) Y
> ON X.a1<=>Y.b1 AND X.a2<=>Y.b2
> then we can have smaller table data when execute join, because  group by has 
> cut lots of 
>  data.
>  
> A pr will be submit soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27237) Introduce State schema validation among query restart

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27237:


Assignee: Apache Spark

> Introduce State schema validation among query restart
> -
>
> Key: SPARK-27237
> URL: https://issues.apache.org/jira/browse/SPARK-27237
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jungtaek Lim
>Assignee: Apache Spark
>Priority: Major
>
> Even though Spark structured streaming guide page clearly documents that "Any 
> change in number or type of grouping keys or aggregates is not allowed.", 
> Spark doesn't do anything when end users try to do it, which would end up 
> with indeterministic outputs or unexpected exceptions.
> Even worse, if the query doesn't crash by chance it could write the new 
> messed values to state which completely breaks state unless end users roll 
> back to specific batch via manually editing checkpoint.
> The restriction is clear, the number of columns, and data type for each must 
> not be modified among query runs. We can store schema of state along with 
> state, and verify whether the (maybe) new schema is compatible if state 
> schema is modified. With this validation we can prevent query runs and shows 
> indeterministic behavior when schema is incompatible, as well as we can give 
> more informative error messages to end users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27272) Enable blacklisting of node/executor on fetch failures by default

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27272:


Assignee: Apache Spark

> Enable blacklisting of node/executor on fetch failures by default
> -
>
> Key: SPARK-27272
> URL: https://issues.apache.org/jira/browse/SPARK-27272
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ankur Gupta
>Assignee: Apache Spark
>Priority: Major
>
> SPARK-20898 added a new configuration to blacklist a node/executor on fetch
> failures. This config was deemed risky at the time and was disabled by default
> until more data is collected.
> This commit aims to enable that feature by default as we have seen couple of
> instances where that feature was found to be useful. The failures occur 
> because of issues with node manager and external shuffle service on that node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27281) Wrong latest offsets returned by DirectKafkaInputDStream#latestOffsets

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27281:


Assignee: Apache Spark

> Wrong latest offsets returned by DirectKafkaInputDStream#latestOffsets
> --
>
> Key: SPARK-27281
> URL: https://issues.apache.org/jira/browse/SPARK-27281
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0
>Reporter: Viacheslav Krot
>Assignee: Apache Spark
>Priority: Major
>
> I have a very strange and hard to reproduce issue when using kafka direct 
> streaming, version 2.4.0
>  From time to time, maybe once a day - once a week I get following error 
> {noformat}
> java.lang.IllegalArgumentException: requirement failed: numRecords must not 
> be negative
> at scala.Predef$.require(Predef.scala:224)
> at 
> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> at scala.Option.orElse(Option.scala:289)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
> at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> at scala.util.Try$.apply(Try.scala:192)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> 19/01/29 13:10:00 ERROR apps.BusinessRuleEngine: Job failed. Stopping JVM
> java.lang.IllegalArgumentException: requirement failed: numRecords must not 
> be negative
> at scala.Predef$.require(Predef.scala:224)
> at 
> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> at 
> org.apache.spark.s

[jira] [Assigned] (SPARK-27295) Provision to provide initial values for each source node in personalised page rank - Graphx

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27295:


Assignee: (was: Apache Spark)

> Provision to provide initial values for each source node in personalised page 
> rank - Graphx
> ---
>
> Key: SPARK-27295
> URL: https://issues.apache.org/jira/browse/SPARK-27295
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX
>Affects Versions: 2.4.0
>Reporter: Eshwar S R
>Priority: Major
>
> The present implementation of parallel personalized page rank algorithm takes 
> only node ids as the starting nodes for algorithm. And then it assigns 
> initial value of 1.0 to all those source nodes.
> But the user might also be interested in specifying the initial values for 
> each node. 
> I have done the required very small modification to the existing code to 
> achieve this. I thought it might help lot more people if I share it here, 
> hence raising a PR for the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27281) Wrong latest offsets returned by DirectKafkaInputDStream#latestOffsets

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27281:


Assignee: (was: Apache Spark)

> Wrong latest offsets returned by DirectKafkaInputDStream#latestOffsets
> --
>
> Key: SPARK-27281
> URL: https://issues.apache.org/jira/browse/SPARK-27281
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0
>Reporter: Viacheslav Krot
>Priority: Major
>
> I have a very strange and hard to reproduce issue when using kafka direct 
> streaming, version 2.4.0
>  From time to time, maybe once a day - once a week I get following error 
> {noformat}
> java.lang.IllegalArgumentException: requirement failed: numRecords must not 
> be negative
> at scala.Predef$.require(Predef.scala:224)
> at 
> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> at scala.Option.orElse(Option.scala:289)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
> at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
> at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> at scala.util.Try$.apply(Try.scala:192)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> 19/01/29 13:10:00 ERROR apps.BusinessRuleEngine: Job failed. Stopping JVM
> java.lang.IllegalArgumentException: requirement failed: numRecords must not 
> be negative
> at scala.Predef$.require(Predef.scala:224)
> at 
> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:250)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> at 
> org.apache.spark.streaming.dstream.DStream$

[jira] [Assigned] (SPARK-27258) The value of "spark.app.name" or "--name" starts with number , which causes resourceName does not match regular expression

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27258:


Assignee: Apache Spark

> The value of "spark.app.name" or "--name" starts with number , which causes 
> resourceName does not match regular expression
> --
>
> Key: SPARK-27258
> URL: https://issues.apache.org/jira/browse/SPARK-27258
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: hehuiyuan
>Assignee: Apache Spark
>Priority: Minor
>
> {code:java}
> Exception in thread "main" 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> POST at: https://xxx:xxx/api/v1/namespaces/xxx/services. Message: Service 
> "1min-machinereg-yf-1544604108931-driver-svc" is invalid: metadata.name: 
> Invalid value: "1min-machinereg-yf-1544604108931-driver-svc": a DNS-1035 
> label must consist of lower case alphanumeric characters or '-', start with 
> an alphabetic character, and end with an alphanumeric character (e.g. 
> 'my-name',  or 'abc-123', regex used for validation is 
> '[a-z]([-a-z0-9]*[a-z0-9])?'). Received status: Status(apiVersion=v1, 
> code=422, details=StatusDetails(causes=[StatusCause(field=metadata.name, 
> message=Invalid value: "1min-machinereg-yf-1544604108931-driver-svc": a 
> DNS-1035 label must consist of lower case alphanumeric characters or '-', 
> start with an alphabetic character, and end with an alphanumeric character 
> (e.g. 'my-name',  or 'abc-123', regex used for validation is 
> '[a-z]([-a-z0-9]*[a-z0-9])?'), reason=FieldValueInvalid, 
> additionalProperties={})], group=null, kind=Service, 
> name=1min-machinereg-yf-1544604108931-driver-svc, retryAfterSeconds=null, 
> uid=null, additionalProperties={}).
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27214:


Assignee: Apache Spark

> Upgrading locality level when lots of pending tasks have been waiting more 
> than locality.wait
> -
>
> Key: SPARK-27214
> URL: https://issues.apache.org/jira/browse/SPARK-27214
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Assignee: Apache Spark
>Priority: Major
>
> Currently, Spark locality wait mechanism is not friendly for large job, when 
> number of tasks is large(e.g. 1+)and with a large number of 
> executors(e.g. 2000), executors may be launched on some nodes  where the 
> locality is not the best(not the same nodes hold HDFS blocks). There are 
> cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks 
> within `spark.locality.wait` but coming at low rate(e.g. every 
> `spark.locality.wait` seconds a task is finished), so locality level would 
> not be upgraded and lots of pending tasks will wait a long time. 
> In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
> executors may be removed by Driver due to become idle and finally slow down 
> the job.
> We encountered this issue in our production spark cluster, it caused lots of 
> resources wasting and slowed down user's application.
> Actually, we can optimize this by following formula:
> Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
> probabilityOfNextLocalitySchedule=0.5
> {code:java}
> maxTolerableStarvingTime = numTasksCanRun * medianOfTaskExecutionTime * 
> localityExecutionGainFactor * probabilityOfNextLocalitySchedule
> totalStarvingTime = sum(starvingTimeByTasks)
> if (totalStarvingTime > maxTolerableStarvingTime)
> {  upgrading locality level... }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27295) Provision to provide initial values for each source node in personalised page rank - Graphx

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27295:


Assignee: Apache Spark

> Provision to provide initial values for each source node in personalised page 
> rank - Graphx
> ---
>
> Key: SPARK-27295
> URL: https://issues.apache.org/jira/browse/SPARK-27295
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX
>Affects Versions: 2.4.0
>Reporter: Eshwar S R
>Assignee: Apache Spark
>Priority: Major
>
> The present implementation of parallel personalized page rank algorithm takes 
> only node ids as the starting nodes for algorithm. And then it assigns 
> initial value of 1.0 to all those source nodes.
> But the user might also be interested in specifying the initial values for 
> each node. 
> I have done the required very small modification to the existing code to 
> achieve this. I thought it might help lot more people if I share it here, 
> hence raising a PR for the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27319) Filter out dir based on PathFilter before listing them

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27319:


Assignee: Apache Spark

> Filter out dir based on PathFilter before listing them
> --
>
> Key: SPARK-27319
> URL: https://issues.apache.org/jira/browse/SPARK-27319
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xianyang Liu
>Assignee: Apache Spark
>Priority: Minor
>
> In `InMemoryFileIndex`, we should filter out dir based on PathFilter before 
> listing them



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27232) Ignore file locality in InMemoryFileIndex if spark.locality.wait is set to

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27232:


Assignee: Apache Spark

> Ignore file locality in InMemoryFileIndex if spark.locality.wait is set to
> --
>
> Key: SPARK-27232
> URL: https://issues.apache.org/jira/browse/SPARK-27232
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: EdisonWang
>Assignee: Apache Spark
>Priority: Minor
>
> `InMemoryFileIndex` needs to request file block location information in order 
> to do locality schedule in `TaskSetManager`. 
> Usually this is a time-cost task.  For example, In our production env, there 
> are 24 partitions, with totally 149925 files and 83TB in size. It costs about 
> 10 minutes to request file block locations before submit a spark job. Even 
> though I set `spark.sql.sources.parallelPartitionDiscovery.threshold` to 24 
> to make it parallelized, it also needs 2 minutes. 
> Anyway, this is a waste if we don't care about the locality of files(for 
> example, storage and computation are separate).
> So there should be a conf to control whether we need to send 
> `getFileBlockLocations` request to HDFS NN. If user set `spark.locality.wait` 
> to 0, file block location information is meaningless. 
> Here in this PR, if `spark.locality.wait` is set to 0, it will not request 
> file location information anymore, which will save several seconds to minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27214:


Assignee: (was: Apache Spark)

> Upgrading locality level when lots of pending tasks have been waiting more 
> than locality.wait
> -
>
> Key: SPARK-27214
> URL: https://issues.apache.org/jira/browse/SPARK-27214
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark locality wait mechanism is not friendly for large job, when 
> number of tasks is large(e.g. 1+)and with a large number of 
> executors(e.g. 2000), executors may be launched on some nodes  where the 
> locality is not the best(not the same nodes hold HDFS blocks). There are 
> cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks 
> within `spark.locality.wait` but coming at low rate(e.g. every 
> `spark.locality.wait` seconds a task is finished), so locality level would 
> not be upgraded and lots of pending tasks will wait a long time. 
> In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
> executors may be removed by Driver due to become idle and finally slow down 
> the job.
> We encountered this issue in our production spark cluster, it caused lots of 
> resources wasting and slowed down user's application.
> Actually, we can optimize this by following formula:
> Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
> probabilityOfNextLocalitySchedule=0.5
> {code:java}
> maxTolerableStarvingTime = numTasksCanRun * medianOfTaskExecutionTime * 
> localityExecutionGainFactor * probabilityOfNextLocalitySchedule
> totalStarvingTime = sum(starvingTimeByTasks)
> if (totalStarvingTime > maxTolerableStarvingTime)
> {  upgrading locality level... }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27258) The value of "spark.app.name" or "--name" starts with number , which causes resourceName does not match regular expression

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27258:


Assignee: (was: Apache Spark)

> The value of "spark.app.name" or "--name" starts with number , which causes 
> resourceName does not match regular expression
> --
>
> Key: SPARK-27258
> URL: https://issues.apache.org/jira/browse/SPARK-27258
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: hehuiyuan
>Priority: Minor
>
> {code:java}
> Exception in thread "main" 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> POST at: https://xxx:xxx/api/v1/namespaces/xxx/services. Message: Service 
> "1min-machinereg-yf-1544604108931-driver-svc" is invalid: metadata.name: 
> Invalid value: "1min-machinereg-yf-1544604108931-driver-svc": a DNS-1035 
> label must consist of lower case alphanumeric characters or '-', start with 
> an alphabetic character, and end with an alphanumeric character (e.g. 
> 'my-name',  or 'abc-123', regex used for validation is 
> '[a-z]([-a-z0-9]*[a-z0-9])?'). Received status: Status(apiVersion=v1, 
> code=422, details=StatusDetails(causes=[StatusCause(field=metadata.name, 
> message=Invalid value: "1min-machinereg-yf-1544604108931-driver-svc": a 
> DNS-1035 label must consist of lower case alphanumeric characters or '-', 
> start with an alphabetic character, and end with an alphanumeric character 
> (e.g. 'my-name',  or 'abc-123', regex used for validation is 
> '[a-z]([-a-z0-9]*[a-z0-9])?'), reason=FieldValueInvalid, 
> additionalProperties={})], group=null, kind=Service, 
> name=1min-machinereg-yf-1544604108931-driver-svc, retryAfterSeconds=null, 
> uid=null, additionalProperties={}).
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27402) Fix hadoop-3.2 test issue(except the hive-thriftserver module)

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27402:


Assignee: Apache Spark

> Fix hadoop-3.2 test issue(except the hive-thriftserver module)
> --
>
> Key: SPARK-27402
> URL: https://issues.apache.org/jira/browse/SPARK-27402
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>Priority: Major
>
> Fix sql/core and sql/hive modules test issue for hadoop-3.2



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27343) Use ConfigEntry for hardcoded configs for spark-sql-kafka

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27343:


Assignee: Apache Spark

> Use ConfigEntry for hardcoded configs for spark-sql-kafka
> -
>
> Key: SPARK-27343
> URL: https://issues.apache.org/jira/browse/SPARK-27343
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: hehuiyuan
>Assignee: Apache Spark
>Priority: Minor
>
> Extracting parameters , building the objects of ConfigEntry



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27354) Move incompatible code from the hive-thriftserver module to sql/hive-thriftserver/v1.2.1

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27354:


Assignee: (was: Apache Spark)

> Move incompatible code from the hive-thriftserver module to 
> sql/hive-thriftserver/v1.2.1
> 
>
> Key: SPARK-27354
> URL: https://issues.apache.org/jira/browse/SPARK-27354
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> When we upgraded the built-in Hive to 2.3.4, the current 
> {{hive-thriftserver}} module is not compatible, such as these Hive changes:
>  # HIVE-12442 HiveServer2: Refactor/repackage HiveServer2's Thrift code so 
> that it can be used in the tasks
>  # HIVE-12237 Use slf4j as logging facade
>  # HIVE-13169 HiveServer2: Support delegation token based connection when 
> using http transport
> So we should add a new {{hive-thriftserver}} module for Hive 2.3.4:
> 1. Add a new empty module for Hive 2.3.4 named {{hive-thriftserverV2}}.
> 2. Make {{hive-thriftserver}} can only be activated when testing with 
> hadoop-2.7.
> 3. Make {{hive-thriftserverV2}} can only be activated when testing with 
> hadoop-3.2.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27355) make query execution more sensitive to epoch message late or lost

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27355:


Assignee: Apache Spark

> make query execution more sensitive to epoch message late or lost
> -
>
> Key: SPARK-27355
> URL: https://issues.apache.org/jira/browse/SPARK-27355
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Genmao Yu
>Assignee: Apache Spark
>Priority: Minor
>
> In SPARK-23503, we enforce sequencing of committed epochs for Continuous 
> Execution. In case a message for epoch n is lost and epoch (n + 1) is ready 
> for commit before epoch n is, epoch (n + 1) will wait for epoch n to be 
> committed first. With extreme condition, we will wait for 
> `epochBacklogQueueSize` (1 in default) epochs and then failed. There is 
> no need to wait for such a long time before query fail, and we can make the 
> condition more sensitive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27348) HeartbeatReceiver doesn't remove lost executors from CoarseGrainedSchedulerBackend

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27348:


Assignee: (was: Apache Spark)

> HeartbeatReceiver doesn't remove lost executors from 
> CoarseGrainedSchedulerBackend
> --
>
> Key: SPARK-27348
> URL: https://issues.apache.org/jira/browse/SPARK-27348
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> When a heartbeat timeout happens in HeartbeatReceiver, it doesn't remove lost 
> executors from CoarseGrainedSchedulerBackend. When a connection of an 
> executor is not gracefully shut down, CoarseGrainedSchedulerBackend may not 
> receive a disconnect event. In this case, CoarseGrainedSchedulerBackend still 
> thinks a lost executor is still alive. CoarseGrainedSchedulerBackend may ask 
> TaskScheduler to run tasks on this lost executor. This task will never finish 
> and the job will hang forever.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27413) Keep the same epoch pace between driver and executor.

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27413:


Assignee: (was: Apache Spark)

> Keep the same epoch pace between driver and executor.
> -
>
> Key: SPARK-27413
> URL: https://issues.apache.org/jira/browse/SPARK-27413
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Genmao Yu
>Priority: Minor
>
> The pace of epoch generation in driver and epoch pulling in executor is 
> different. It will result in many empty epochs for partition if the epoch 
> pulling interval is larger than epoch generation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27425) Add count_if functions

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27425:


Assignee: (was: Apache Spark)

> Add count_if functions
> --
>
> Key: SPARK-27425
> URL: https://issues.apache.org/jira/browse/SPARK-27425
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Chaerim Yeo
>Priority: Minor
>
> Add aggregation function which returns the number of records satisfying a 
> given condition.
> For Presto, 
> [{{count_if}}|https://prestodb.github.io/docs/current/functions/aggregate.html]
>  function is supported, we can write concisely.
> However, Spark does not support yet, we need to write like {{COUNT(CASE WHEN 
> some_condition THEN 1 END)}} or {{SUM(CASE WHEN some_condition THEN 1 END)}}, 
> which looks painful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27294) Multi-cluster Kafka delegation token support

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27294:


Assignee: Apache Spark

> Multi-cluster Kafka delegation token support
> 
>
> Key: SPARK-27294
> URL: https://issues.apache.org/jira/browse/SPARK-27294
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Assignee: Apache Spark
>Priority: Major
>
> Kafka delegation token only supports single cluster at the moment.
> I've created a small document with the proposed Spark approach 
> [here|https://docs.google.com/document/d/1yuwIxKqUnzo5RxJDIqrWWC2s67hh5Tb1QtfIwEKVWtM/edit?usp=sharing].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27347) [MESOS] Fix supervised driver retry logic when agent crashes/restarts

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27347:


Assignee: Apache Spark

> [MESOS] Fix supervised driver retry logic when agent crashes/restarts
> -
>
> Key: SPARK-27347
> URL: https://issues.apache.org/jira/browse/SPARK-27347
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.2.1, 2.3.2, 2.4.0
>Reporter: Sam Tran
>Assignee: Apache Spark
>Priority: Major
>  Labels: mesos
>
> Ran into scenarios where {{--supervised}} Spark jobs were retried multiple 
> times when an agent would crash, come back, and re-register even when those 
> jobs had already relaunched on a different agent.
> That is:
>  * supervised driver is running on agent1
>  * agent1 crashes
>  * driver is relaunched on another agent as `-retry-1`
>  * agent1 comes back online and re-registers with scheduler
>  * spark relaunches the same job as `-retry-2`
>  * now there are two jobs running simultaneously and the first retry job is 
> effectively orphaned within Zookeeper
> This is because when an agent comes back and re-registers, it sends a status 
> update {{TASK_FAILED}} for its old driver-task. Previous logic would 
> indiscriminately remove the {{submissionId from Zookeeper's launchedDrivers}} 
> node and add it to {{retryList}} node.
> Then, when a new offer came in, it would relaunch another -retry task even 
> though one was previously running.
> Sample log looks something like this: 
> {code:java}
> 19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos: 
> ... [offers] ...
> 19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 
> 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver 
> driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001"
> ...
> 19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_STARTING message=''
> 19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_RUNNING message=''
> ...
> 19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed 
> out' reason=REASON_SLAVE_REMOVED
> ...
> 19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 
> 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver 
> driver-20190115192138-0001 with taskId: value: 
> "driver-20190115192138-0001-retry-1"
> ...
> 19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message=''
> 19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message=''
> ...
> 19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable 
> agent re-reregistered'
> ...
> 19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal 
> executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED
> 19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with 
> driver-20190115192138-0001 in status update
> ...
> 19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 
> 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver 
> driver-20190115192138-0001 with taskId: value: 
> "driver-20190115192138-0001-retry-2"
> ...
> 19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message=''
> 19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message=''{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27388) expression encoder for avro objects

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27388:


Assignee: (was: Apache Spark)

> expression encoder for avro objects
> ---
>
> Key: SPARK-27388
> URL: https://issues.apache.org/jira/browse/SPARK-27388
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Taoufik DACHRAOUI
>Priority: Major
>
> *What changes were proposed in this pull request?*
> The PR adds support for bean objects, java.util.List, java.util.Map, 
> java.nio.ByteBuffer and java enums to ScalaReflection; unlike the existing 
> javaBean Encoder, properties can be named without the set/get prefix (this is 
> one of the key points that allows the encoding of Avro Fixed types. I 
> believe, the other key point is that the addition must be in ScalaReflection).
> Reminder of Avro types:
>  * primitive types: null, boolean, int, long, float, double, bytes, string
>  * complex types: Records, Enums, Arrays, Maps, Unions, Fixed
> This PR supports simple unions (having a null type and a non-null type) but 
> not complex unions for the simple reason that the Avro compiler will generate 
> java code with type Object for all complex unions, and fields with simple 
> unions will be typed as the non-null type of the union.
>  
> *How was this patch tested?*
> currently only 1 encodeDecodeTest was added to ExpressionEncoderSuite; the 
> test uses the following avro schema:
> {code:java}
> {"namespace": "org.apache.spark.sql.catalyst.encoders", "type": "record", 
> "name": "AvroExample1",
>  "fields": [
>  
> {"name":"mymoney","type":["null",{"type":"record","name":"Money","namespace":"org.apache.spark.sql.catalyst.encoders","fields":[
>  {"name":"amount","type":"float","default":0},
>  
> {"name":"currency","type":{"type":"enum","name":"Currency","symbols":["EUR","USD","BRL"]},"default":"EUR"}]}],
>  "default":null},
>  {"name": "myfloat", "type": "float"},
>  {"name": "mylong", "type": "long"},
>  {"name": "myint", "type": "int"},
>  {"name": "mydouble", "type": "double"},
>  {"name": "myboolean", "type": "boolean"},
>  {"name": "mystring", "type": "string"},
>  {"name": "mybytes", "type": "bytes"},
>  {"name": "myfixed", "type": {"type": "fixed", "name": "Magic", "size": 4}},
>  {"name": "myarray", "type": {"type": "array", "items": "string"}},
>  {"name": "mymap", "type": {"type": "map", "values": "int"}}
>  ] }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27348) HeartbeatReceiver doesn't remove lost executors from CoarseGrainedSchedulerBackend

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27348:


Assignee: Apache Spark

> HeartbeatReceiver doesn't remove lost executors from 
> CoarseGrainedSchedulerBackend
> --
>
> Key: SPARK-27348
> URL: https://issues.apache.org/jira/browse/SPARK-27348
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Apache Spark
>Priority: Major
>
> When a heartbeat timeout happens in HeartbeatReceiver, it doesn't remove lost 
> executors from CoarseGrainedSchedulerBackend. When a connection of an 
> executor is not gracefully shut down, CoarseGrainedSchedulerBackend may not 
> receive a disconnect event. In this case, CoarseGrainedSchedulerBackend still 
> thinks a lost executor is still alive. CoarseGrainedSchedulerBackend may ask 
> TaskScheduler to run tasks on this lost executor. This task will never finish 
> and the job will hang forever.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27441) Add read/write tests to Hive serde tables(include Parquet vectorized reader)

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27441:


Assignee: Apache Spark

> Add read/write tests to Hive serde tables(include Parquet vectorized reader)
> 
>
> Key: SPARK-27441
> URL: https://issues.apache.org/jira/browse/SPARK-27441
> Project: Spark
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>Priority: Major
>
> The versions between Hive, Parquet and ORC after the built-in Hive upgrade to 
> 2.3.4:
> built-in Hive is 1.2.1:
> || ||ORC||Parquet||
> |Spark datasource table|1.5.5|1.10.1|
> |Spark hive table|Hive built-in|1.6.0|
> |Hive 1.2.1|Hive built-in|1.6.0|
> built-in Hive is 2.3.4:
> || ||ORC||Parquet||
> |Spark datasource table|1.5.5|1.10.1|
> |Spark hive table|1.5.5|1.8.1|
> |Hive 2.3.4|1.3.3|1.8.1|
>  We should add a test for Hive Serde table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27388) expression encoder for avro objects

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27388:


Assignee: Apache Spark

> expression encoder for avro objects
> ---
>
> Key: SPARK-27388
> URL: https://issues.apache.org/jira/browse/SPARK-27388
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Taoufik DACHRAOUI
>Assignee: Apache Spark
>Priority: Major
>
> *What changes were proposed in this pull request?*
> The PR adds support for bean objects, java.util.List, java.util.Map, 
> java.nio.ByteBuffer and java enums to ScalaReflection; unlike the existing 
> javaBean Encoder, properties can be named without the set/get prefix (this is 
> one of the key points that allows the encoding of Avro Fixed types. I 
> believe, the other key point is that the addition must be in ScalaReflection).
> Reminder of Avro types:
>  * primitive types: null, boolean, int, long, float, double, bytes, string
>  * complex types: Records, Enums, Arrays, Maps, Unions, Fixed
> This PR supports simple unions (having a null type and a non-null type) but 
> not complex unions for the simple reason that the Avro compiler will generate 
> java code with type Object for all complex unions, and fields with simple 
> unions will be typed as the non-null type of the union.
>  
> *How was this patch tested?*
> currently only 1 encodeDecodeTest was added to ExpressionEncoderSuite; the 
> test uses the following avro schema:
> {code:java}
> {"namespace": "org.apache.spark.sql.catalyst.encoders", "type": "record", 
> "name": "AvroExample1",
>  "fields": [
>  
> {"name":"mymoney","type":["null",{"type":"record","name":"Money","namespace":"org.apache.spark.sql.catalyst.encoders","fields":[
>  {"name":"amount","type":"float","default":0},
>  
> {"name":"currency","type":{"type":"enum","name":"Currency","symbols":["EUR","USD","BRL"]},"default":"EUR"}]}],
>  "default":null},
>  {"name": "myfloat", "type": "float"},
>  {"name": "mylong", "type": "long"},
>  {"name": "myint", "type": "int"},
>  {"name": "mydouble", "type": "double"},
>  {"name": "myboolean", "type": "boolean"},
>  {"name": "mystring", "type": "string"},
>  {"name": "mybytes", "type": "bytes"},
>  {"name": "myfixed", "type": {"type": "fixed", "name": "Magic", "size": 4}},
>  {"name": "myarray", "type": {"type": "array", "items": "string"}},
>  {"name": "mymap", "type": {"type": "map", "values": "int"}}
>  ] }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27366) Spark scheduler internal changes to support GPU scheduling

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27366:


Assignee: Apache Spark  (was: Xingbo Jiang)

> Spark scheduler internal changes to support GPU scheduling
> --
>
> Key: SPARK-27366
> URL: https://issues.apache.org/jira/browse/SPARK-27366
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Apache Spark
>Priority: Major
>
> Update Spark job scheduler to support accelerator resource requests submitted 
> at application level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27343) Use ConfigEntry for hardcoded configs for spark-sql-kafka

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27343:


Assignee: (was: Apache Spark)

> Use ConfigEntry for hardcoded configs for spark-sql-kafka
> -
>
> Key: SPARK-27343
> URL: https://issues.apache.org/jira/browse/SPARK-27343
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: hehuiyuan
>Priority: Minor
>
> Extracting parameters , building the objects of ConfigEntry



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27355) make query execution more sensitive to epoch message late or lost

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27355:


Assignee: (was: Apache Spark)

> make query execution more sensitive to epoch message late or lost
> -
>
> Key: SPARK-27355
> URL: https://issues.apache.org/jira/browse/SPARK-27355
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Genmao Yu
>Priority: Minor
>
> In SPARK-23503, we enforce sequencing of committed epochs for Continuous 
> Execution. In case a message for epoch n is lost and epoch (n + 1) is ready 
> for commit before epoch n is, epoch (n + 1) will wait for epoch n to be 
> committed first. With extreme condition, we will wait for 
> `epochBacklogQueueSize` (1 in default) epochs and then failed. There is 
> no need to wait for such a long time before query fail, and we can make the 
> condition more sensitive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27024) Executor interface for cluster managers to support GPU resources

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27024:


Assignee: Apache Spark  (was: Thomas Graves)

> Executor interface for cluster managers to support GPU resources
> 
>
> Key: SPARK-27024
> URL: https://issues.apache.org/jira/browse/SPARK-27024
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Xingbo Jiang
>Assignee: Apache Spark
>Priority: Major
>
> The executor interface shall deal with the resources allocated to the 
> executor by cluster managers(Standalone, YARN, Kubernetes).  The Executor 
> either needs to be told the resources it was given or it needs to discover 
> them in order for the executor to sync with the driver to expose available 
> resources to support task scheduling.
> Note this is part of a bigger feature for gpu-aware scheduling and is just 
> how the executor find the resources. The general flow :
>  * users ask for a certain set of resources, for instance number of gpus - 
> each cluster manager has a specific way to do this.
>  * cluster manager allocates a container or set of resources (standalone mode)
>  * When spark launches the executor in that container, the executor either 
> has to be told what resources it has or it has to auto discover them.
>  * Executor has to register with Driver and tell the driver the set of 
> resources it has so the scheduler can use that to schedule tasks that 
> requires a certain amount of each of those resources



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27299) Design: Property graph construction, save/load, and query APIs

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27299:


Assignee: Martin Junghanns  (was: Apache Spark)

> Design: Property graph construction, save/load, and query APIs
> --
>
> Key: SPARK-27299
> URL: https://issues.apache.org/jira/browse/SPARK-27299
> Project: Spark
>  Issue Type: Story
>  Components: Graph
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Martin Junghanns
>Priority: Major
>
> Design doc for property graph and Cypher queries.
> * Construct a property graph.
> * How nodes and relationships map to DataFrames
> * Save/load.
> * Cypher query.
> * Support Scala/Python/Java.
> * Dependencies
> * Test
> Out of scope:
> * Graph algorithms.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27347) [MESOS] Fix supervised driver retry logic when agent crashes/restarts

2019-04-25 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27347:


Assignee: (was: Apache Spark)

> [MESOS] Fix supervised driver retry logic when agent crashes/restarts
> -
>
> Key: SPARK-27347
> URL: https://issues.apache.org/jira/browse/SPARK-27347
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.2.1, 2.3.2, 2.4.0
>Reporter: Sam Tran
>Priority: Major
>  Labels: mesos
>
> Ran into scenarios where {{--supervised}} Spark jobs were retried multiple 
> times when an agent would crash, come back, and re-register even when those 
> jobs had already relaunched on a different agent.
> That is:
>  * supervised driver is running on agent1
>  * agent1 crashes
>  * driver is relaunched on another agent as `-retry-1`
>  * agent1 comes back online and re-registers with scheduler
>  * spark relaunches the same job as `-retry-2`
>  * now there are two jobs running simultaneously and the first retry job is 
> effectively orphaned within Zookeeper
> This is because when an agent comes back and re-registers, it sends a status 
> update {{TASK_FAILED}} for its old driver-task. Previous logic would 
> indiscriminately remove the {{submissionId from Zookeeper's launchedDrivers}} 
> node and add it to {{retryList}} node.
> Then, when a new offer came in, it would relaunch another -retry task even 
> though one was previously running.
> Sample log looks something like this: 
> {code:java}
> 19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos: 
> ... [offers] ...
> 19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 
> 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver 
> driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001"
> ...
> 19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_STARTING message=''
> 19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_RUNNING message=''
> ...
> 19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed 
> out' reason=REASON_SLAVE_REMOVED
> ...
> 19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 
> 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver 
> driver-20190115192138-0001 with taskId: value: 
> "driver-20190115192138-0001-retry-1"
> ...
> 19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message=''
> 19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message=''
> ...
> 19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable 
> agent re-reregistered'
> ...
> 19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal 
> executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED
> 19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with 
> driver-20190115192138-0001 in status update
> ...
> 19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 
> 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver 
> driver-20190115192138-0001 with taskId: value: 
> "driver-20190115192138-0001-retry-2"
> ...
> 19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message=''
> 19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: 
> taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message=''{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >